Last active
September 18, 2022 06:56
-
-
Save xssfox/3fc32fca37e91478b3a25610c84511aa to your computer and use it in GitHub Desktop.
SSTV Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Requires an RTSP stream (I used rtsp-simple-server) and rigctld running | |
requirements pyaudio, pillow, numpy, pysstv, opencv-python | |
Update output_device_index to be your audio sink (I use pulse) | |
""" | |
import cv2 | |
from PIL import Image, ImageOps, ImageDraw, ImageFont | |
import numpy as np | |
from pysstv import color, grayscale | |
import pyaudio | |
import io | |
import time | |
import socket | |
import logging | |
import sys, traceback | |
mode = color.Robot36 | |
class Rigctld: | |
""" rotctld (hamlib) communication class """ | |
# Note: This is a massive hack. | |
def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5): | |
""" Open a connection to rotctld, and test it for validity """ | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.settimeout(timeout) | |
self.hostname = hostname | |
self.port = port | |
self.connect() | |
logging.debug(f"Rigctl intialized") | |
def get_model(self): | |
""" Get the rotator model from rotctld """ | |
model = self.send_command(b"_") | |
return model | |
def connect(self): | |
""" Connect to rotctld instance """ | |
self.sock.connect((self.hostname, self.port)) | |
model = self.get_model() | |
if model == None: | |
# Timeout! | |
self.close() | |
raise Exception("Timeout!") | |
else: | |
return model | |
def close(self): | |
self.sock.close() | |
def send_command(self, command): | |
"""Send a command to the connected rotctld instance, | |
and return the return value. | |
""" | |
self.sock.sendall(command + b"\n") | |
try: | |
return self.sock.recv(1024) | |
except: | |
return None | |
def ptt_enable(self): | |
logging.debug(f"PTT enabled") | |
self.send_command(b"T 1") | |
def ptt_disable(self): | |
logging.debug(f"PTT disabled") | |
self.send_command(b"T 0") | |
def get_image(): | |
cap = cv2.VideoCapture('rtsp://streamip:8554/mystream') | |
ret, frame = cap.read() | |
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
return Image.fromarray(img) | |
def generate_sstv(image): | |
wav_bytes = io.BytesIO() | |
mode(image,48000,16).write_wav(wav_bytes) | |
wav_bytes.seek(0) | |
audio = wav_bytes | |
return audio | |
rig = Rigctld() | |
p = pyaudio.PyAudio() | |
stream = p.open(format = pyaudio.paInt16, | |
channels = 1, | |
rate = 48000, | |
output = True, | |
output_device_index=13 | |
) | |
try: | |
while(1): | |
image = get_image() | |
# resize | |
image = image.resize((image.width, image.height)) | |
image = ImageOps.contain(image, (mode.WIDTH, mode.HEIGHT) ) | |
image = ImageOps.pad(image, (mode.WIDTH, mode.HEIGHT)) | |
audio = generate_sstv(image) | |
rig.ptt_enable() | |
while(audio): | |
data = audio.read(24) | |
if data: | |
stream.write(data) | |
else: | |
break | |
#stream.write(audio) | |
# play_obj = sa.play_buffer(audio, 1, 2, 48000) | |
# play_obj.wait_done() | |
rig.ptt_disable() | |
time.sleep(25) | |
except: | |
traceback.print_exc(file=sys.stderr) | |
rig.ptt_disable() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment