|
import socket |
|
socket.setdefaulttimeout(10) |
|
import aprslib |
|
import aprs |
|
import PIL.Image |
|
import PIL.ImageDraw |
|
import PIL.ImageFont |
|
import PIL.ImageChops |
|
import requests |
|
from io import BytesIO |
|
import traceback |
|
import logging |
|
import time |
|
import datetime |
|
WIDTH = 384 |
|
MESSAGE_TIME = 4*60*60 # 4 hours in seconds |
|
messages_heard = {} |
|
|
|
CALLSIGN="VK4XSS" |
|
SSID="1" |
|
PRINT_SERVER="http://file.foxden:5000" |
|
def callback(packet): |
|
try: |
|
packet = aprslib.parse(bytes(packet)) |
|
print(packet) |
|
if 'msgNo' in packet: |
|
if isDup(packet['from'], packet["message_text"], packet['msgNo']): # remove duplicates |
|
sendAck(packet['from'], packet['msgNo']) |
|
return |
|
if packet["format"] == "message" and packet['addresse'].upper() == f"{CALLSIGN}-{SSID}": |
|
print("ok to print") |
|
print_docket( |
|
packet["from"], |
|
packet["message_text"], |
|
",".join(packet["path"]) |
|
) |
|
print("printed") |
|
if 'msgNo' in packet: |
|
logging.info(f"Sending ack to {packet['from']}") |
|
sendAck(packet['from'], packet['msgNo']) |
|
logging.info(f"Sent ack to {packet['from']}") |
|
if isDup(packet['from'], packet["message_text"], packet['msgNo']): # remove duplicates |
|
return |
|
except: |
|
print(traceback.format_exc()) |
|
|
|
class CustomFormatter(logging.Formatter): |
|
|
|
grey = "\x1b[2m" |
|
green = "\x1b[32;20m" |
|
yellow = "\x1b[33;20m" |
|
red = "\x1b[31;20m" |
|
bold_red = "\x1b[31;1m" |
|
reset = "\x1b[0m" |
|
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)" |
|
|
|
FORMATS = { |
|
logging.DEBUG: grey + format + reset, |
|
logging.INFO: green + format + reset, |
|
logging.WARNING: yellow + format + reset, |
|
logging.ERROR: red + format + reset, |
|
logging.CRITICAL: bold_red + format + reset |
|
} |
|
|
|
def format(self, record): |
|
log_fmt = self.FORMATS.get(record.levelno) |
|
formatter = logging.Formatter(log_fmt) |
|
return formatter.format(record) |
|
ch = logging.StreamHandler() |
|
ch.setLevel(logging.DEBUG) |
|
|
|
ch.setFormatter(CustomFormatter()) |
|
|
|
logging.getLogger().addHandler(ch) |
|
|
|
def isDup(callsign, message, messageno): |
|
# expire duplicates |
|
for k,v in messages_heard.copy().items(): |
|
if v < datetime.datetime.utcnow() - datetime.timedelta(seconds=MESSAGE_TIME): |
|
logging.debug(f"Removing {k} from messages_heard : {v}") |
|
messages_heard.pop(k, None) |
|
|
|
if (callsign, message, messageno) in messages_heard: |
|
logging.debug(f"Message {(callsign, message, messageno)} already heard at {messages_heard[(callsign, message, messageno)]}") |
|
return True |
|
else: |
|
logging.debug(f"Adding {(callsign, message, messageno)} to messages_heard") |
|
messages_heard[(callsign, message, messageno)] = datetime.datetime.utcnow() |
|
return False |
|
|
|
def sendAck(callsign, msgNo): |
|
callsign = callsign.ljust(9, ' ') |
|
time.sleep(1) |
|
a.send((f"{CALLSIGN}-{SSID}>APRS,TCPIP*::"+callsign+":ack"+msgNo).encode("ascii")) |
|
print("sent ack") |
|
|
|
def get_wrapped_text(text: str, font: PIL.ImageFont.ImageFont, |
|
line_length: int): |
|
lines = [''] |
|
for word in text.split(): |
|
line = f'{lines[-1]} {word}'.strip() |
|
if font.getlength(line) <= line_length: |
|
lines[-1] = line |
|
else: |
|
lines.append(word) |
|
return '\n'.join(lines) |
|
|
|
def print_docket(from_call, message, via): |
|
print("printing") |
|
image = PIL.Image.new("RGB", [384, 180], (255,255,255)) |
|
font_smaller = PIL.ImageFont.truetype("DejaVuSansMono.ttf", 18) |
|
font_small = PIL.ImageFont.truetype("DejaVuSansMono.ttf", 24) |
|
font_large = PIL.ImageFont.truetype("DejaVuSansMono.ttf", 32) |
|
|
|
lines = [] |
|
for line in message.splitlines(): |
|
lines.append(get_wrapped_text(line, font_small, WIDTH)) |
|
lines = "\n".join(lines) |
|
|
|
d = PIL.ImageDraw.Draw(image) |
|
d.text((0,0), from_call, fill=(0,0,0), font=font_large) |
|
d.text((0,30), via, fill=(0,0,0), font=font_smaller) |
|
d.text((0,60), lines, fill=(0,0,0), font=font_small) |
|
output_bytes = BytesIO() |
|
print("saving") |
|
image.save(output_bytes,format="png") |
|
output_bytes.seek(0) |
|
print("sending") |
|
send_to_server(output_bytes) |
|
print("sent to server") |
|
# |
|
|
|
def send_to_server(bytes_send): |
|
url = PRINT_SERVER |
|
|
|
payload={'feed': '100', 'font':'DejaVuSansMono.ttf'} |
|
files=[ |
|
('image',('Untitled.png',bytes_send,'image/png')) |
|
] |
|
headers = {} |
|
|
|
response = requests.request("POST", url, headers=headers, data=payload, files=files) |
|
|
|
print(response.text) |
|
|
|
a = aprs.TCP(CALLSIGN.encode(), str(aprslib.passcode(CALLSIGN)).encode(), aprs_filter=f"g/{CALLSIGN}-{SSID}".encode()) # filter |
|
a.start() |
|
a.interface.settimeout(None) |
|
a.receive(callback=callback) |
Fantastic!