Created
October 11, 2023 18:18
-
-
Save hemna/ad07b5299dddcf01b2ea11fd1d491d16 to your computer and use it in GitHub Desktop.
tail direwolf log and convert to aprsd Packet objects.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Test script to test using asyncio to | |
# Tail a direwolf logfile looking for RX/TX APRS packets | |
# and then convert those to APRSD Packet objects. | |
# | |
# | |
# 1) create virtualenv | |
# python -m venv .venv | |
# 2) source venv | |
# source .venv/bin/activate | |
# 3) install aprsd and async_tail | |
# pip install aprsd async-tail | |
# 4) update DIREWOLF_LOG setting to point to your active direwolf.log | |
# 5) run it | |
# python test.py | |
import asyncio | |
from rich.console import Console | |
import re | |
import aprslib | |
from aprsd.packets import core | |
from async_tail import atail | |
DIREWOLF_LOG='/home/pi/direwolf.log' | |
async def main(): | |
cs = Console() | |
cs.print("Starting up!") | |
async for line in atail(DIREWOLF_LOG): | |
if line[0]: | |
# Look for a packet RX on RF | |
search = re.search("^\[\d\.*\d*\] (.*)", line[0]) | |
packetstring = None | |
if search: | |
cs.print("Found a X output!") | |
packetstring = search.group(1) | |
packetstring = packetstring.replace('<0x0d>','\x0d').replace('<0x1c>','\x1c').replace('<0x1e>','\x1e').replace('<0x1f>','\0x1f').replace('<0x0a>','\0x0a') | |
pkt_dict = aprslib.parse(packetstring) | |
packet = core.Packet.factory(pkt_dict) | |
cs.print(packet) | |
else: | |
# Look for a packet being TX on RF | |
search = re.search("^\[\d[HL]\] (.*)", line[0]) | |
if search: | |
cs.print("Found a TX output!") | |
# We found a line we are Transmitting | |
packetstring = search.group(1) | |
packetstring = packetstring.replace('<0x0d>','\x0d').replace('<0x1c>','\x1c').replace('<0x1e>','\x1e').replace('<0x1f>','\0x1f').replace('<0x0a>','\0x0a') | |
pkt_dict = aprslib.parse(packetstring) | |
packet = core.Packet.factory(pkt_dict) | |
cs.print(packet) | |
else: | |
cs.print(f"Line doesn't match") | |
continue | |
#cs.print(line) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment