Last active
January 6, 2023 18:54
-
-
Save siku2/a33be938efb2b9bd54c4d3d7e9c004db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import concurrent.futures | |
import dataclasses | |
import re | |
import subprocess | |
import sys | |
import typing | |
import urllib.parse | |
import urllib.request | |
from http.client import HTTPResponse | |
_SERVER_BASE_URL = "http://localhost:19080" | |
_RTT_URL = f"{_SERVER_BASE_URL}/rtt.htm" | |
_RTT_SSE_OUTPUT_URL = f"{_SERVER_BASE_URL}/TabX_SSEOutput.cgi?rtt" | |
class Sse: | |
... | |
@dataclasses.dataclass(kw_only=True) | |
class Event(Sse): | |
kind: str | |
data: bytes | |
@dataclasses.dataclass(kw_only=True) | |
class Retry(Sse): | |
value: int | |
_MAX_LINE_LEN = 8 * 1024 * 1024 | |
async def _sse_line_reader() -> typing.AsyncIterator[tuple[str | None, bytes]]: | |
loop = asyncio.get_running_loop() | |
# single-threaded executor | |
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
def inner(): | |
resp: HTTPResponse | |
with urllib.request.urlopen(_RTT_SSE_OUTPUT_URL) as resp: | |
assert resp.status == 200 | |
while True: | |
line = resp.readline(_MAX_LINE_LEN) | |
if line == b"": | |
break | |
# trim newline | |
line = line[:-1] | |
yield line | |
# create the iterator in the thread | |
line_iter = await loop.run_in_executor(executor, inner) | |
sentinel = object() | |
while True: | |
line: typing.Any = await loop.run_in_executor( | |
executor, next, line_iter, sentinel | |
) | |
if line is sentinel: | |
break | |
if line == b"": | |
yield None, b"" | |
continue | |
key, _, value = line.partition(b": ") | |
yield key.decode("ascii"), value | |
async def _iter_sse() -> typing.AsyncIterator[Sse]: | |
line_iter = _sse_line_reader() | |
async for key, value in line_iter: | |
if key is None: | |
continue | |
match key: | |
case "retry": | |
yield Retry(value=int(value)) | |
case "event": | |
kind = value.decode("utf-8") | |
key2, value2 = await anext(line_iter) | |
assert key2 == "data" | |
yield Event(kind=kind, data=value2) | |
case _: | |
raise TypeError(f"unknown type: {key}") | |
async def _rtt_post(data: dict[str, str]) -> None: | |
loop = asyncio.get_running_loop() | |
raw_data = urllib.parse.urlencode(data).encode("utf-8") | |
def inner(): | |
resp: HTTPResponse | |
with urllib.request.urlopen(_RTT_URL, data=raw_data) as resp: | |
assert resp.status == 200 | |
await loop.run_in_executor(None, inner) | |
async def rtt_start_stop() -> None: | |
print("INFO: starting RTT") | |
await _rtt_post( | |
{ | |
"RTT_RTTAddrSet": "", | |
"RTT_BtnStartStop": "", | |
} | |
) | |
async def rtt_set_hex_format() -> None: | |
print("INFO: setting format to HEX") | |
await _rtt_post( | |
{ | |
"RTT_ComboBoxDataFormat": "0", | |
} | |
) | |
async def rtt_clear() -> None: | |
print("INFO: clearing buffer") | |
await _rtt_post( | |
{ | |
"RTT_BtnClearRx": "1", | |
} | |
) | |
# server caps out at 64 KiB | |
_MAX_LEN_BEFORE_CLEAR = 48 * 1024 | |
_RE_HEX_BYTE = re.compile( | |
rb"(?:^|\s|>)\b([0-9A-F]{2})(?=\s|$)", re.MULTILINE | re.IGNORECASE | |
) | |
async def _defmt_feeder( | |
defmt_proc: subprocess.Popen[bytes], | |
) -> typing.AsyncGenerator[None, bytes | None]: | |
assert defmt_proc.stdin | |
prev_match_count = 0 | |
clearing_fut: asyncio.Future[None] | None = None | |
while True: | |
rtt_hex = yield | |
assert isinstance(rtt_hex, bytes) | |
hex_byte_matches: list[bytes] = _RE_HEX_BYTE.findall(rtt_hex) | |
match_count = len(hex_byte_matches) | |
if match_count > prev_match_count: | |
new_hex_byte_matches = hex_byte_matches[prev_match_count:] | |
elif match_count < prev_match_count: | |
# assume we were cleared, so all the bytes we have can be considered new | |
new_hex_byte_matches = hex_byte_matches | |
else: | |
# same length, nothing new | |
new_hex_byte_matches = [] | |
prev_match_count = match_count | |
if prev_match_count > _MAX_LEN_BEFORE_CLEAR: | |
if clearing_fut is None or clearing_fut.done(): | |
clearing_fut = asyncio.create_task(rtt_clear()) | |
if new_hex_byte_matches: | |
rtt_data = bytes(map(lambda b: int(b, 16), new_hex_byte_matches)) | |
defmt_proc.stdin.write(rtt_data) | |
defmt_proc.stdin.flush() | |
async def main(): | |
target = sys.argv[1] | |
defmt_proc = subprocess.Popen( | |
["defmt-print", "-e", target], | |
stdin=subprocess.PIPE, | |
) | |
defmt_feeder = _defmt_feeder(defmt_proc) | |
await defmt_feeder.asend(None) | |
wait_is_running = False | |
wait_is_hex = False | |
wait_control_block = False | |
async for sse in _iter_sse(): | |
if isinstance(sse, Event) and sse.kind == "RTT_SSE_CSVPageData": | |
fields = sse.data.split(b"$") | |
is_running = fields[1] == b"1" | |
if not is_running: | |
if wait_is_running: | |
continue | |
await rtt_start_stop() | |
wait_is_running = True | |
else: | |
wait_is_running = False | |
data_format_hex = b";0|selected" in fields[21] | |
if not data_format_hex: | |
if wait_is_hex: | |
continue | |
await rtt_set_hex_format() | |
wait_is_hex = True | |
else: | |
wait_is_hex = False | |
looking_for_control_block = fields[7] == b"Not valid (yet)" | |
if looking_for_control_block: | |
if wait_control_block: | |
continue | |
print("INFO: searching for control block...") | |
wait_control_block = True | |
elif wait_control_block: | |
print("INFO: control block found") | |
wait_control_block = False | |
rtt_hex = fields[23] | |
await defmt_feeder.asend(rtt_hex) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment