Created November 30, 2014 08:57
Urbit UDP sniffer
# Copyright (c) 2014 Wladimir J. van der Laan, Visucore
# Distributed under the MIT software license, see
urbit UDP sniffer
Usage: [-p <port1>-<port2>,<port3>,...] [-i <interface>]
import socket, struct, sys, io, argparse
from struct import pack,unpack
from binascii import b2a_hex
class Args: # default args
# interface we're interested in
interface = b'eth0'
# ports we're interested in
ports = set(list(range(4000,4008)) + [13337, 41954])
# constants...
CRYPTOS = {0:'%none', 1:'%open', 2:'%fast', 3:'%full'}
ETH_P_ALL = 0x0300 # =htons(0x03) from /usr/include/linux/if_ether.h
# utilities...
def ipv4str(addr):
'''Bytes to IPv4 address'''
return '.'.join(['%i' % i for i in addr])
def crypto_name(x):
'''Name for crypto algo'''
if x in CRYPTOS:
return CRYPTOS[x]
return 'unk%02i' % x
def varnum_le(x):
'''Little-endian bytes to long'''
rv = 0
for i,b in enumerate(x):
rv |= b << (i*8)
return rv
def hexstr(x):
'''Bytes to hex string'''
return b2a_hex(x).decode()
def format_ship(x):
'''Format ship id'''
# @todo: ~name mangling?
if x < 0x100: # 8-bit
return '%02x' % (x,)
elif x < 0x10000: # 16-bit
return '%04x' % (x,)
elif x < 0x100000000: # 32-bit
return '%04x.%04x' % (x>>16, x&0xffff)
elif x < 0x10000000000000000: # 64-bit
return '%04x.%04x.%04x.%04x' % ((x>>48)&0xffff, (x>>32)&0xffff, (x>>16)&0xffff, x&0xffff)
else: # 128-bit
return '%04x.%04x.%04x.%04x.%04x.%04x.%04x.%04x' % (
(x>>112)&0xffff, (x>>96)&0xffff, (x>>80)&0xffff, (x>>64)&0xffff,
(x>>48)&0xffff, (x>>32)&0xffff, (x>>16)&0xffff, x&0xffff)
def colorize(str, col):
return ('\x1b[38;5;%im' % col) + str + ('\x1b[0m')
# cli colors and glyphs
v_arrow = colorize('→', 240)
v_attention = colorize('>', 34) + colorize('>', 82) + colorize('>', 118)
v_colon = colorize(':', 240)
v_equal = colorize('=', 245)
def parse_args():
args = Args()
parser = argparse.ArgumentParser(description='Urbit sniffer. Dump incoming and outgoing urbit packets.')
pdefault = '4000-4007,13337,41954' # update this when Args changes...
idefault = args.interface.decode()
parser.add_argument('-p, --ports', dest='ports', help='Ports to listen on (default: '+pdefault+')')
parser.add_argument('-i, --interface', dest='interface', help='Interface to listen on (default:'+idefault+')', default=idefault)
r = parser.parse_args()
args.interface = r.interface.encode()
if r.ports is not None:
args.ports = set()
for t in r.ports.split(','):
(a,_,b) = t.partition('-')
ai = int(a)
bi = int(b) if b else ai
args.ports.update(list(range(int(ai), int(bi)+1)))
return args
def main(args):
# would be better as filtering happens in the kernel but doesn't catch outgoing packets:
#s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
s = socket.socket(socket.PF_PACKET, socket.SOCK_DGRAM, ETH_P_ALL)
print(v_attention + ' Listening on ' + args.interface.decode() + ' ports ' + (',').join(str(x) for x in args.ports))
s.setsockopt(socket.SOL_SOCKET,SO_BINDTODEVICE,pack("%ds" % (len(args.interface)+1,), args.interface))
while True:
packet = s.recvfrom(65565)
packet = packet[0] # drop extra information
# IP header
iph = unpack('!BBHHHBBH4s4s', packet[0:20])
ihl = (iph[0] & 15)*4
if ihl < 20: # cannot handle IP headers <20 bytes
# print("Warn: invhdr")
protocol = iph[6]
srcaddr = iph[8]
dstaddr = iph[9]
if protocol != 17: # not UDP or no place for header
#print("Warn: invproto")
# UDP header
(sport, dport, ulength, uchecksum) = unpack('!HHHH', packet[ihl:ihl+8])
data = packet[ihl+8:ihl+ulength]
if len(data) != (ulength-8):
print("Warn: invlength")
continue # invalid length packet
if dport not in args.ports and sport not in args.ports: # only urbit ports
# Urbit header and payload
urhdr = unpack('<L', data[0:4])[0]
proto = urhdr & 7
mug = (urhdr >> 3) & 0xfffff
yax = (urhdr >> 23) & 3
yax_bytes = 1<<(yax+1)
qax = (urhdr >> 25) & 3
qax_bytes = 1<<(qax+1)
crypto = (urhdr >> 27)
sender = varnum_le(data[4:4+yax_bytes])
receiver = varnum_le(data[4+yax_bytes:4+yax_bytes+qax_bytes])
payload = data[4+yax_bytes+qax_bytes:]
except (IndexError, struct.error):
print('Warn: invpkt')
# Print packet
(sport, dport, ulength, uchecksum) = unpack('>HHHH', packet[ihl:ihl+8])
print( colorize(ipv4str(srcaddr), COLOR_IP) + v_colon + colorize(str(sport), COLOR_IP) + ' ' +
v_arrow + ' ' +
colorize(ipv4str(dstaddr), COLOR_IP) + v_colon + colorize(str(dport), COLOR_IP) + ' ' +
colorize('proto', COLOR_HEADER) + v_equal + colorize(str(proto), COLOR_VALUE) + ' ' +
colorize('mug', COLOR_HEADER) + v_equal + colorize('%05x' % mug, COLOR_VALUE) + ' ' +
colorize('crypto', COLOR_HEADER) + v_equal + colorize(crypto_name(crypto), COLOR_VALUE) + ' ' +
colorize('sender', COLOR_HEADER) + v_equal + colorize(format_ship(sender), COLOR_VALUE) + ' ' +
colorize('receiver', COLOR_HEADER) + v_equal + colorize(format_ship(receiver), COLOR_VALUE))
print(' ' + colorize(hexstr(payload), COLOR_DATA))
if __name__ == '__main__':
# Force UTF8 out
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf8', line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf8', line_buffering=True)
except KeyboardInterrupt:
