Skip to content

Instantly share code, notes, and snippets.

@crazyboycjr
Last active March 27, 2021 03:37
Show Gist options
  • Save crazyboycjr/24d784f6f84c895c1e89b4c0b6b6b02f to your computer and use it in GitHub Desktop.
Save crazyboycjr/24d784f6f84c895c1e89b4c0b6b6b02f to your computer and use it in GitHub Desktop.
start_flow.py
#!/usr/bin/env python3
import subprocess
import threading
import shlex
import time
import argparse
hosts = [
'192.168.211.2',
'192.168.211.34',
'192.168.211.130',
'192.168.211.162',
]
hosts_mask = [1,1,1,1]
def get_avail_by_mask(hosts, mask):
return [x[1] for x in filter(lambda x: x[0] != 0, zip(mask, hosts))]
hosts_avail = get_avail_by_mask(hosts, hosts_mask)
ip_to_id_cnt = 0
ip_to_id = {}
for h in hosts_avail:
ip_to_id[h] = ip_to_id_cnt
ip_to_id_cnt += 1
def get_id_from_ip(ip, hosts):
for i, h in enumerate(hosts):
if h == ip: return i
assert False, "no valid id found for ip: {}".format(ip)
def run_task(cmd):
#cmd_snip = shlex.split(cmd + " i am " + str(tid))
cmd_snip = shlex.split(cmd)
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
#print 'out:', out
#print 'err:', err
#print 'rc:', p.returncode
return out, err
'''
---------------------------------------------------------------------------------------
RDMA_Write BW Test
Dual-port : OFF Device : mlx5_0
Number of qps : 1 Transport type : IB
Connection type : RC Using SRQ : OFF
CQ Moderation : 100
Mtu : 1024[B]
Link type : Ethernet
GID index : 3
Max inline data : 0[B]
rdma_cm QPs : ON
Data ex. method : rdma_cm
---------------------------------------------------------------------------------------
Waiting for client rdma_cm QP to connect
Please run the same command with the IB/RoCE interface IP
---------------------------------------------------------------------------------------
local address: LID 0000 QPN 0x2e5e PSN 0x9078af
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02
remote address: LID 0000 QPN 0x036e PSN 0x472771
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02
---------------------------------------------------------------------------------------
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps]
65536 305338 0.00 26.69 0.050913
---------------------------------------------------------------------------------------
'''
def parse_ib_write_output(text):
lines = text.splitlines()
def extract_ipv4(line):
return '.'.join(map(lambda x: str(int(x)), line.split(b':')[-4:]))
for i, line in enumerate(lines):
if line.startswith(b' local'):
local_ip = extract_ipv4(lines[i + 1])
if line.startswith(b' remote'):
remote_ip = extract_ipv4(lines[i + 1])
if line.startswith(b' #bytes'):
bw_line = lines[i + 1]
words = list(filter(lambda x: len(x) > 0, bw_line.split(b' ')))
bw = words[3]
#print('local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw))
return local_ip, remote_ip, bw.decode()
def receiver_task(cmd):
out, err = run_task(cmd)
return parse_ib_write_output(out)
# local_ip, remote_ip, bw = parse_ib_write_output(out)
# src_id = get_id_from_ip(remote_ip, hosts_avail)
# dst_id = get_id_from_ip(local_ip, hosts_avail)
# assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id)
# bw_matrix_ref[0][src_id][dst_id] = bw
def emit_pair(client, server, port, duration):
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port)
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port)
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log)
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D {} {} > {} &'.format(port, duration, server, client_log)
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)]
def append_cmds(cmds, host, cmd):
if host in cmds:
cmds[host].append(cmd)
else:
cmds[host] = [cmd]
def ssh_submit(cmds, ths):
for k, v in cmds.items():
cmd_on_host = ';'.join(v + ['wait'])
cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host)
print(cmd)
ths.append(threading.Thread(target=run_task, args=(cmd, )))
def ssh_fetch(cmds):
flows = []
for [h, cmd] in cmds:
ssh_cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd)
print(ssh_cmd)
flows.append(receiver_task(ssh_cmd))
return flows
def main(args):
ths_s = []
ths_c = []
servers = hosts_avail[2:]
clients = hosts_avail[:2]
cmds_s = {}
cmds_c = {}
fetch_cmds_s = []
base_port = 18000
def emit_flow(client, server):
nonlocal base_port
ret = emit_pair(client, server, base_port, args.duration)
append_cmds(cmds_s, server, ret[0])
append_cmds(cmds_c, client, ret[1])
fetch_cmds_s.append([server, ret[2]])
base_port += 1
emit_flow(clients[0], servers[0]);
for i in range(0, args.num_competitors):
emit_flow(clients[1], servers[1]);
ssh_submit(cmds_s, ths_s)
ssh_submit(cmds_c, ths_c)
for th in ths_s:
th.start()
time.sleep(5)
for th in ths_c:
th.start()
for th in ths_s + ths_c:
th.join()
flows = ssh_fetch(fetch_cmds_s)
print(flows)
def add_args(parser):
parser.add_argument('-n', '--num-competitors', type=int, default=1, help='specify the number of competitor flows')
parser.add_argument('-D', '--duration', type=int, default=10, help='flow duration (in seconds)')
# parse args
parser = argparse.ArgumentParser(description="Bandwidth allocation test.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
add_args(parser)
args = parser.parse_args()
main(args)
#!/usr/bin/env python2
import subprocess
import threading
import shlex
import time
hosts = [
'10.0.11.2',
'10.0.13.2',
'10.0.15.3',
'10.0.17.2',
'10.0.22.2',
'10.0.24.2',
'10.0.26.2',
'10.0.28.2'
]
hosts_mask = [0,0,0,0,1,1,1,1]
#hosts_mask = [1,1,1,1,0,0,0,0]
def get_avail_by_mask(hosts, mask):
return list(zip(*filter(lambda x: x[0] != 0, zip(mask, hosts)))[1])
hosts_avail = get_avail_by_mask(hosts, hosts_mask)
ip_to_id_cnt = 0
ip_to_id = {}
for h in hosts_avail:
ip_to_id[h] = ip_to_id_cnt
ip_to_id_cnt += 1
def get_id_from_ip(ip, hosts):
for i, h in enumerate(hosts):
if h == ip: return i
assert False, "no valid id found for ip: {}".format(ip)
n_worker = len(hosts_avail)
bw_matrix = [[0 for i in range(n_worker)] for i in range(n_worker)]
def run_task(cmd):
#cmd_snip = shlex.split(cmd + " i am " + str(tid))
cmd_snip = shlex.split(cmd)
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
#print 'out:', out
#print 'err:', err
#print 'rc:', p.returncode
return out, err
'''
---------------------------------------------------------------------------------------
RDMA_Write BW Test
Dual-port : OFF Device : mlx5_0
Number of qps : 1 Transport type : IB
Connection type : RC Using SRQ : OFF
CQ Moderation : 100
Mtu : 1024[B]
Link type : Ethernet
GID index : 3
Max inline data : 0[B]
rdma_cm QPs : ON
Data ex. method : rdma_cm
---------------------------------------------------------------------------------------
Waiting for client rdma_cm QP to connect
Please run the same command with the IB/RoCE interface IP
---------------------------------------------------------------------------------------
local address: LID 0000 QPN 0x2e5e PSN 0x9078af
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02
remote address: LID 0000 QPN 0x036e PSN 0x472771
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02
---------------------------------------------------------------------------------------
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps]
65536 305338 0.00 26.69 0.050913
---------------------------------------------------------------------------------------
'''
def parse_ib_write_output(text):
lines = text.splitlines()
def extract_ipv4(line):
temp = line[-11:]
return '.'.join(map(lambda x: str(int(x)), temp.split(':')))
for i, line in enumerate(lines):
if line.startswith(' local'):
local_ip = extract_ipv4(lines[i + 1])
if line.startswith(' remote'):
remote_ip = extract_ipv4(lines[i + 1])
if line.startswith(' #bytes'):
bw_line = lines[i + 1]
words = filter(lambda x: len(x) > 0, bw_line.split(' '))
bw = words[3]
#print 'local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw)
return local_ip, remote_ip, bw
def receiver_task(cmd, bw_matrix_ref):
out, err = run_task(cmd)
local_ip, remote_ip, bw = parse_ib_write_output(out)
src_id = get_id_from_ip(remote_ip, hosts_avail)
dst_id = get_id_from_ip(local_ip, hosts_avail)
assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id)
bw_matrix_ref[0][src_id][dst_id] = bw
def emit_pair(client, server, port):
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port)
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port)
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log)
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D 10 {} > {} &'.format(port, server, client_log)
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)]
def append_cmds(cmds, host, cmd):
if cmds.has_key(host):
cmds[host].append(cmd)
else:
cmds[host] = [cmd]
def ssh_submit(cmds, ths):
for k, v in cmds.items():
cmd_on_host = ';'.join(v + ['wait'])
cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host)
print cmd
ths.append(threading.Thread(target=run_task, args=(cmd, )))
def ssh_fetch(cmds):
for [h, cmd] in cmds:
ssh_cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd)
print ssh_cmd
receiver_task(ssh_cmd, [bw_matrix])
def print_bw_matrix(title, bw_mat):
print title
matrix = []
matrix.append(['src\dst'] + hosts_avail)
for i in range(len(bw_mat)):
matrix.append([hosts_avail[i]] + bw_mat[i])
s = [[str(e) for e in row] for row in matrix]
lens = [max(map(len, col)) for col in zip(*s)]
fmt = '\t'.join('{{:{}}}'.format(x) for x in lens)
table = [fmt.format(*row) for row in s]
print '\n'.join(table)
def main():
ths_s = []
ths_c = []
servers = hosts_avail[0:4]
clients = hosts_avail[0:4]
cmds_s = {}
cmds_c = {}
fetch_cmds_s = []
base_port = 18000
for client in clients:
for server in servers:
if client == server: continue
ret = emit_pair(client, server, base_port)
append_cmds(cmds_s, server, ret[0])
append_cmds(cmds_c, client, ret[1])
fetch_cmds_s.append([server, ret[2]])
base_port += 1
ssh_submit(cmds_s, ths_s)
ssh_submit(cmds_c, ths_c)
for th in ths_s:
th.start()
time.sleep(1)
for th in ths_c:
th.start()
for th in ths_s + ths_c:
th.join()
ssh_fetch(fetch_cmds_s)
print_bw_matrix('Bandwidth Matrix', bw_matrix)
main()
@crazyboycjr
Copy link
Author

crazyboycjr commented Feb 9, 2020

One of the problems I encountered and diagnosed is about DCQCN fairness. DCQCN algorithm is implemented in the NIC firmware. Different firmware versions can cause such an issue which is hard to find and locate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment