Last active March 27, 2021 03:37
#!/usr/bin/env python3
import subprocess
import threading
import shlex
import time
import argparse
hosts = [
hosts_mask = [1,1,1,1]
def get_avail_by_mask(hosts, mask):
return [x[1] for x in filter(lambda x: x[0] != 0, zip(mask, hosts))]
hosts_avail = get_avail_by_mask(hosts, hosts_mask)
ip_to_id_cnt = 0
ip_to_id = {}
for h in hosts_avail:
ip_to_id[h] = ip_to_id_cnt
ip_to_id_cnt += 1
def get_id_from_ip(ip, hosts):
for i, h in enumerate(hosts):
if h == ip: return i
assert False, "no valid id found for ip: {}".format(ip)
def run_task(cmd):
#cmd_snip = shlex.split(cmd + " i am " + str(tid))
cmd_snip = shlex.split(cmd)
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
#print 'out:', out
#print 'err:', err
#print 'rc:', p.returncode
return out, err
RDMA_Write BW Test
Dual-port : OFF Device : mlx5_0
Number of qps : 1 Transport type : IB
Connection type : RC Using SRQ : OFF
CQ Moderation : 100
Mtu : 1024[B]
Link type : Ethernet
GID index : 3
Max inline data : 0[B]
rdma_cm QPs : ON
Data ex. method : rdma_cm
Waiting for client rdma_cm QP to connect
Please run the same command with the IB/RoCE interface IP
local address: LID 0000 QPN 0x2e5e PSN 0x9078af
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02
remote address: LID 0000 QPN 0x036e PSN 0x472771
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps]
65536 305338 0.00 26.69 0.050913
def parse_ib_write_output(text):
lines = text.splitlines()
def extract_ipv4(line):
return '.'.join(map(lambda x: str(int(x)), line.split(b':')[-4:]))
for i, line in enumerate(lines):
if line.startswith(b' local'):
local_ip = extract_ipv4(lines[i + 1])
if line.startswith(b' remote'):
remote_ip = extract_ipv4(lines[i + 1])
if line.startswith(b' #bytes'):
bw_line = lines[i + 1]
words = list(filter(lambda x: len(x) > 0, bw_line.split(b' ')))
bw = words[3]
#print('local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw))
return local_ip, remote_ip, bw.decode()
def receiver_task(cmd):
out, err = run_task(cmd)
return parse_ib_write_output(out)
# local_ip, remote_ip, bw = parse_ib_write_output(out)
# src_id = get_id_from_ip(remote_ip, hosts_avail)
# dst_id = get_id_from_ip(local_ip, hosts_avail)
# assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id)
# bw_matrix_ref[0][src_id][dst_id] = bw
def emit_pair(client, server, port, duration):
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port)
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port)
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log)
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D {} {} > {} &'.format(port, duration, server, client_log)
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)]
def append_cmds(cmds, host, cmd):
if host in cmds:
cmds[host] = [cmd]
def ssh_submit(cmds, ths):
for k, v in cmds.items():
cmd_on_host = ';'.join(v + ['wait'])
cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host)
ths.append(threading.Thread(target=run_task, args=(cmd, )))
def ssh_fetch(cmds):
flows = []
for [h, cmd] in cmds:
ssh_cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd)
return flows
def main(args):
ths_s = []
ths_c = []
servers = hosts_avail[2:]
clients = hosts_avail[:2]
cmds_s = {}
cmds_c = {}
fetch_cmds_s = []
base_port = 18000
def emit_flow(client, server):
nonlocal base_port
ret = emit_pair(client, server, base_port, args.duration)
append_cmds(cmds_s, server, ret[0])
append_cmds(cmds_c, client, ret[1])
fetch_cmds_s.append([server, ret[2]])
base_port += 1
emit_flow(clients[0], servers[0]);
for i in range(0, args.num_competitors):
emit_flow(clients[1], servers[1]);
ssh_submit(cmds_s, ths_s)
ssh_submit(cmds_c, ths_c)
for th in ths_s:
for th in ths_c:
for th in ths_s + ths_c:
flows = ssh_fetch(fetch_cmds_s)
def add_args(parser):
parser.add_argument('-n', '--num-competitors', type=int, default=1, help='specify the number of competitor flows')
parser.add_argument('-D', '--duration', type=int, default=10, help='flow duration (in seconds)')
# parse args
parser = argparse.ArgumentParser(description="Bandwidth allocation test.",
args = parser.parse_args()
#!/usr/bin/env python2
import subprocess
import threading
import shlex
import time
hosts = [
hosts_mask = [0,0,0,0,1,1,1,1]
#hosts_mask = [1,1,1,1,0,0,0,0]
def get_avail_by_mask(hosts, mask):
return list(zip(*filter(lambda x: x[0] != 0, zip(mask, hosts)))[1])
hosts_avail = get_avail_by_mask(hosts, hosts_mask)
ip_to_id_cnt = 0
ip_to_id = {}
for h in hosts_avail:
ip_to_id[h] = ip_to_id_cnt
ip_to_id_cnt += 1
def get_id_from_ip(ip, hosts):
for i, h in enumerate(hosts):
if h == ip: return i
assert False, "no valid id found for ip: {}".format(ip)
n_worker = len(hosts_avail)
bw_matrix = [[0 for i in range(n_worker)] for i in range(n_worker)]
def run_task(cmd):
#cmd_snip = shlex.split(cmd + " i am " + str(tid))
cmd_snip = shlex.split(cmd)
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
#print 'out:', out
#print 'err:', err
#print 'rc:', p.returncode
return out, err
RDMA_Write BW Test
Dual-port : OFF Device : mlx5_0
Number of qps : 1 Transport type : IB
Connection type : RC Using SRQ : OFF
CQ Moderation : 100
Mtu : 1024[B]
Link type : Ethernet
GID index : 3
Max inline data : 0[B]
rdma_cm QPs : ON
Data ex. method : rdma_cm
Waiting for client rdma_cm QP to connect
Please run the same command with the IB/RoCE interface IP
local address: LID 0000 QPN 0x2e5e PSN 0x9078af
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02
remote address: LID 0000 QPN 0x036e PSN 0x472771
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps]
65536 305338 0.00 26.69 0.050913
def parse_ib_write_output(text):
lines = text.splitlines()
def extract_ipv4(line):
temp = line[-11:]
return '.'.join(map(lambda x: str(int(x)), temp.split(':')))
for i, line in enumerate(lines):
if line.startswith(' local'):
local_ip = extract_ipv4(lines[i + 1])
if line.startswith(' remote'):
remote_ip = extract_ipv4(lines[i + 1])
if line.startswith(' #bytes'):
bw_line = lines[i + 1]
words = filter(lambda x: len(x) > 0, bw_line.split(' '))
bw = words[3]
#print 'local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw)
return local_ip, remote_ip, bw
def receiver_task(cmd, bw_matrix_ref):
out, err = run_task(cmd)
local_ip, remote_ip, bw = parse_ib_write_output(out)
src_id = get_id_from_ip(remote_ip, hosts_avail)
dst_id = get_id_from_ip(local_ip, hosts_avail)
assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id)
bw_matrix_ref[0][src_id][dst_id] = bw
def emit_pair(client, server, port):
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port)
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port)
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log)
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D 10 {} > {} &'.format(port, server, client_log)
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)]
def append_cmds(cmds, host, cmd):
if cmds.has_key(host):
cmds[host] = [cmd]
def ssh_submit(cmds, ths):
for k, v in cmds.items():
cmd_on_host = ';'.join(v + ['wait'])
cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host)
print cmd
ths.append(threading.Thread(target=run_task, args=(cmd, )))
def ssh_fetch(cmds):
for [h, cmd] in cmds:
ssh_cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd)
print ssh_cmd
receiver_task(ssh_cmd, [bw_matrix])
def print_bw_matrix(title, bw_mat):
print title
matrix = []
matrix.append(['src\dst'] + hosts_avail)
for i in range(len(bw_mat)):
matrix.append([hosts_avail[i]] + bw_mat[i])
s = [[str(e) for e in row] for row in matrix]
lens = [max(map(len, col)) for col in zip(*s)]
fmt = '\t'.join('{{:{}}}'.format(x) for x in lens)
table = [fmt.format(*row) for row in s]
print '\n'.join(table)
def main():
ths_s = []
ths_c = []
servers = hosts_avail[0:4]
clients = hosts_avail[0:4]
cmds_s = {}
cmds_c = {}
fetch_cmds_s = []
base_port = 18000
for client in clients:
for server in servers:
if client == server: continue
ret = emit_pair(client, server, base_port)
append_cmds(cmds_s, server, ret[0])
append_cmds(cmds_c, client, ret[1])
fetch_cmds_s.append([server, ret[2]])
base_port += 1
ssh_submit(cmds_s, ths_s)
ssh_submit(cmds_c, ths_c)
for th in ths_s:
for th in ths_c:
for th in ths_s + ths_c:
print_bw_matrix('Bandwidth Matrix', bw_matrix)
crazyboycjr commented Feb 9, 2020

One of the problems I encountered and diagnosed is about DCQCN fairness. DCQCN algorithm is implemented in the NIC firmware. Different firmware versions can cause such an issue which is hard to find and locate.

