Skip to content

Instantly share code, notes, and snippets.

@chrisns
Last active August 23, 2024 13:01
Show Gist options
  • Save chrisns/49fddc2f14eb93e8852f10a9c5399f07 to your computer and use it in GitHub Desktop.
Save chrisns/49fddc2f14eb93e8852f10a9c5399f07 to your computer and use it in GitHub Desktop.
hisense udp remote
#!/bin/sh
# for example to go to the previous input option:
./hisense.py -d $IP_ADDRESS KEY_RETURNS KEY_HOME KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN KEY_DOWN DELAY KEY_RIGHT KEY_OK
# note it gets stuck if it goes into anycast and no longer responds to UDP :(
#!/usr/bin/env python3
from socket import *
from select import select
from time import sleep
import sys
from getopt import getopt, GetoptError
class CM(object):
def __init__(self, host, inputs, outputs):
self.host = host
self.inputs = inputs
self.outputs = outputs
print('cm_open_connection: ', host)
self.sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
self.sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
self.sock.setblocking(0)
self.inputs.append(self)
self.outputs.append(self)
self.state = 0
def write(self):
if self.state == 0:
self.sock.sendto(b'CTCREATE\r\nID: 93901743\r\n\r\n\0', self.host)
self.outputs.remove(self)
self.state += 1
def read(self):
try:
data, server = self.sock.recvfrom(1024)
except timeout:
print('cm_timeout')
else:
if self.state == 1:
decoded_data = data.decode('utf-8') # Decode bytes to string
ctportnum = int(decoded_data.rstrip('\0\r\n').split('\r\n')[1].split(':')[1].strip(' '))
CT((self.host[0], ctportnum), self.inputs, self.outputs)
self.state += 1
elif self.state == 2:
decoded_data = data.decode('utf-8') # Decode bytes to string
ccportnum = int(decoded_data.rstrip('\0\r\n').split('\r\n')[1].split(':')[1].strip(' '))
# Assuming you have some logic to handle ccportnum here
self.state += 1
def fileno(self):
return self.sock.fileno()
class CT(object):
def __init__(self, host, inputs, outputs):
self.host = host
self.inputs = inputs
self.outputs = outputs
print('ct_open_connection: ', host)
self.sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
self.sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
self.sock.setblocking(0)
self.inputs.append(self)
self.outputs.append(self)
self.state = 0
def write(self):
if self.state == 0:
print('ct_send: ', b'SUS\0')
self.sock.sendto(b'SUS\0', self.host)
self.state += 1
elif self.state == 1:
print('ct_send: ', b'CTCREATE\r\nMAC: appmac_appmac_app\r\nVERSION: 0001\r\n\r\n\0')
self.sock.sendto(b'CTCREATE\r\nMAC: appmac_appmac_app\r\nVERSION: 0001\r\n\r\n\0', self.host)
self.outputs.remove(self)
self.state += 1
elif self.state == 3:
print('ct_send: ', b'CCCREATE\r\nID: 93901743\r\n\r\n\0')
self.sock.sendto(b'CCCREATE\r\nID: 93901743\r\n\r\n\0', self.host)
self.outputs.remove(self)
self.state += 1
def read(self):
try:
data,server = self.sock.recvfrom(1024)
print('ct_read: ', data)
except timeout:
print('ct_timeout')
else:
if self.state == 2:
self.outputs.append(self)
self.state += 1
elif self.state == 4:
ccportnum = int(data.decode('utf-8').rstrip('\0\r\n').split('\r\n')[1].split(':')[1].strip(' '))
CC((self.host[0], ccportnum), self.inputs, self.outputs)
self.state += 1
def fileno(self):
return self.sock.fileno()
def read(self):
try:
data, server = self.sock.recvfrom(1024)
print('ct_read: ', data)
except timeout:
print('ct_timeout')
else:
if self.state == 2:
self.outputs.append(self)
self.state += 1
elif self.state == 4:
decoded_data = data.decode('utf-8') # Decode bytes to string
ccportnum = int(decoded_data.rstrip('\0\r\n').split('\r\n')[1].split(':')[1].strip(' '))
CC((self.host[0], ccportnum), self.inputs, self.outputs)
self.state += 1
class CC(object):
keyloop = 0
# keylist = (
# b'KEY_VOLUMEUP', \
# b'KEY_INPUT', \
# b'KEY_INPUT1', \
# b'KEY_INPUT_1', \
# b'KEY_INPUT2', \
# b'KEY_INPUT_2', \
# b'KEY_MODE', \
# b'KEY_MODE1', \
# b'KEY_MODE_1', \
# b'KEY_MODE2', \
# b'KEY_MODE_2', \
# b'KEY_SOURCE', \
# b'KEY_SOURCE1', \
# b'KEY_SOURCE_1', \
# b'KEY_SOURCE2', \
# b'KEY_SOURCE_2', \
# b'KEY_HDMI', \
# b'KEY_HDMI1', \
# b'KEY_HDMI_1', \
# b'KEY_HDMI2', \
# b'KEY_HDMI_2', \
# b'KEY_LIVETV', \
# b'KEY_LIVE_TV', \
# b'KEY_LIVE', \
# b'KEY_TVLIVE', \
# b'KEY_TV_LIVE', \
# b'KEY_MUTE', \
# b'KEY_MUTING', \
# b'KEY_SLEEPTIMER', \
# b'KEY_TIMER', \
# b'KEY_TIMEOUT', \
# b'KEY_TIMEOFF', \
# b'KEY_APP', \
# b'KEY_APPS', \
# b'KEY_APPLICATIONS', \
# b'KEY_PROG', \
# b'KEY_PROGS', \
# b'KEY_PROGRAMS', \
# b'KEY_FN')
def __init__(self, host, inputs, outputs):
self.host = host
self.inputs = inputs
self.outputs = outputs
self.sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
self.sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
self.sock.setblocking(0)
self.inputs.append(self)
self.outputs.append(self)
self.state = 0
def write(self):
if self.state == 0:
self.sock.sendto(b'SUS\0', self.host)
self.state += 1
elif self.state == 1:
# This should be handled more gracefully...
opts, args = getopt(sys.argv[1:], "ld:", ["list", "device="])
if (self.keyloop >= len(args)):
print('no further input, exiting')
sys.exit(0)
key = args[self.keyloop].encode()
mylen = len(key) + 129
mylenstr = str(mylen).zfill(6).encode()
if (args[self.keyloop] == 'DELAY'):
print("Delaying for 1 second")
sleep(1)
else:
self.sock.sendto(b'CMD %s\r\n1\r\n1HISENSE_DELIMITER2HISENSE_DELIMITER2HISENSE_DELIMITER%sHISENSE_DELIMITER10HISENSE_DELIMITER0HISENSE_DELIMITER0\r\n\r\n\0' % (mylenstr, key), self.host)
self.keyloop += 1
sleep(0.3)
elif self.state == 3:
self.sock.sendto(b'END 0000\r\n', self.host)
self.outputs.remove(self)
self.state += 1
def read(self):
try:
data,server = self.sock.recvfrom(1024)
except timeout:
print('cc_timeout')
else:
if self.state == 2:
self.outputs.append(self)
self.state += 1
elif self.state == 4:
exit()
def fileno(self):
return self.sock.fileno()
class HisenseDataObject:
def __init__(self, method=None, version=None, redundantip=None, tvdescriptor=None, extra=None):
self.method = method
self.version = version
self.redundantip = redundantip
self.tvdescriptor = tvdescriptor
self.extra = extra
def __repr__(self):
return "<HisenseDataObject method:%s version:%s redundantip:%s tvdescriptor:%s extra:%s>" % (
self.method, self.version, self.redundantip, self.tvdescriptor, self.extra)
@staticmethod
def fromString(input):
hdo = HisenseDataObject()
hdo.method, hdo.version, hdo.redundantip, hdo.tvdescriptor, hdo.extra = (input.split('#',4) + [None]*4)[:5]
hdo.version = hdo.version if hdo.version != '' else None
return hdo
def __str__(self):
return "%s#%s%s%s%s" % (self.method, \
self.version if self.version != None else '', \
'#'+self.redundantip if self.redundantip != None else '', \
'#'+self.tvdescriptor if self.tvdescriptor != None else '', \
'#'+self.extra if self.extra != None else '')
def encode(self):
return (str(self)).encode()
class HisenseTV(object):
@staticmethod
def discover():
tvlist = {}
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
s.settimeout(0.2)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.bind(('', 54321))
s.sendto(HisenseDataObject('DISCOVERY').encode(), ('<broadcast>', 50000))
while True:
try:
data,server = s.recvfrom(1024)
tvlist[server[0]] = HisenseDataObject.fromString(str(data))
except timeout:
break
return tvlist
# tvlist = HisenseTV.discover()
# print(tvlist)
def usage():
print ("-l (--list): Send a broadcast to discover available devices\n")
print ("-d (--device): Pass specific device IP address (useful if more than one on the network)\n")
print ("any other input(s) are considered commands to send to the TV\n")
sys.exit()
def main(argv):
device = None
try:
opts, args = getopt(argv, "ld:", ["list", "device="])
except GetoptError:
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ("-l", "--list"):
print(HisenseTV.discover())
sys.exit()
elif opt in ("-d", "--device"):
device = arg
if device == None:
devices = HisenseTV.discover()
if (len(devices) == 0):
print("No devices found!")
sys.exit(2)
elif (len(devices) > 1):
print("Multiple devices found, please use -d to specify which one")
sys.exit(2)
device = next(iter(devices))
inputs = []
outputs = []
CM((device, 60030), inputs, outputs)
while inputs:
readable, writable, exceptional = select(inputs, outputs, inputs)
for s in readable:
s.read()
for s in writable:
s.write()
for s in exceptional:
print ('exceptional', s)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment