Last active
June 22, 2021 16:37
-
-
Save maxfischer2781/38e7af51cc25b5a19d2b0f451a6b5dc3 to your computer and use it in GitHub Desktop.
Utility to record/replay UDP datagrams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from typing import BinaryIO | |
import socket | |
import argparse | |
import pathlib | |
import struct | |
import time | |
CLI = argparse.ArgumentParser( | |
description=( | |
"Utility to record/replay UDP datagrams\n" | |
r" ___________ " "\n" | |
r" --\ |.UDP.Tape..| --\ " "\n" | |
r" --/ | ()___() | --/ " "\n" | |
r" |__/_____\__|" | |
), | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
) | |
CLI.add_argument( | |
"PORT", | |
help="Local UDP port where packets arrive", | |
type=int, | |
) | |
CLI.add_argument( | |
"STORE", | |
help="Path at which to store packets", | |
type=pathlib.Path, | |
) | |
CLI.add_argument( | |
"DIRECTION", | |
help="Whether to 'record' or 'replay' packets", | |
choices=["record", "replay"], | |
) | |
CLI.add_argument( | |
"-ff", | |
"--fast-forward", | |
help="Ignore delays during replay", | |
action="store_true", | |
) | |
CLI.add_argument( | |
"--duration", | |
help="How long to record/replay [default: inf]", | |
type=float, | |
default=float('inf'), | |
) | |
def main(): | |
options = CLI.parse_args() | |
port, store, direction = options.PORT, options.STORE, options.DIRECTION | |
duration, fast_forward = options.duration, options.fast_forward | |
try: | |
if direction == "record": | |
record(port, store, duration) | |
elif direction == "replay": | |
replay(port, store, duration, fast_forward) | |
else: | |
raise ValueError(f"Unknown direction {direction:r}") | |
except KeyboardInterrupt: | |
pass | |
# Header for each message: | |
# - packet size (uint16) | |
# - packet delay (double) | |
HEADER = struct.Struct("!Hd") | |
def record(port: int, location: pathlib.Path, duration: float): | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
sock.bind(('localhost', port)) | |
with location.open(mode="ab") as out_stream: | |
previous = time.monotonic() | |
until = previous + duration | |
while previous < until: | |
message = sock.recv(64*1024) # unlikely, but maximum size | |
current = time.monotonic() | |
# make sure there is only one write for both size + payload | |
out_stream.write( | |
HEADER.pack(len(message), current-previous) + message | |
) | |
previous = current | |
if __debug__: | |
print('record', message[:96], '...' if len(message) > 96 else '') | |
def read_messages(in_stream: BinaryIO): | |
header, header_size = HEADER, HEADER.size | |
try: | |
size, delay = header.unpack(in_stream.read(header_size)) | |
delay = 0 # start immediately | |
for message in iter(lambda: in_stream.read(size), b''): | |
yield delay, message | |
size, delay = header.unpack(in_stream.read(header_size)) | |
except struct.error: # raised when there are no further headers | |
pass | |
def replay(port: int, location: pathlib.Path, duration: float, fast_forward: bool): | |
address = ('localhost', port) | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
with location.open(mode="rb") as in_stream: | |
total_delay = 0.0 | |
for delay, message in read_messages(in_stream): | |
total_delay += delay | |
if total_delay > duration: | |
break | |
if not fast_forward: | |
time.sleep(delay) | |
if __debug__: | |
print('replay', len(message), message[:96], '...' if len(message) > 96 else '') | |
sock.sendto(message, address) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment