Created
April 11, 2016 18:01
-
-
Save lad1337/5838561bc4c34cd5fe420c4e0c8e16c4 to your computer and use it in GitHub Desktop.
simple cec client on top of the cec lib
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cec | |
class PowerStatus(): | |
def __init__(self, power_on): | |
self.power_on = power_on | |
def __str__(self): | |
if self.power_on: | |
return 'on' | |
return 'standby' | |
def __repr__(self): | |
return '< Power: %s >' % self | |
def __bool__(self): | |
return self.power_on | |
import logging | |
class CECClient(): | |
def __init__(self, osd_name=None, device_types=None, init=True): | |
self.logger = logging.getLogger('CECClient') | |
self.logger_bus = logging.getLogger('CECClient.bus') | |
self.cecconfig = cec.libcec_configuration() | |
self.cecconfig.strDeviceName = osd_name or "py cec client" | |
self.cecconfig.bActivateSource = 0 | |
self.cecconfig.clientVersion = cec.LIBCEC_VERSION_CURRENT | |
self.cecconfig.SetLogCallback(self.log_callback) | |
if device_types: | |
for device_type in device_types: | |
self.cecconfig.deviceTypes.Add(device_type) | |
else: | |
self.cecconfig.deviceTypes.Add(cec.CEC_DEVICE_TYPE_PLAYBACK_DEVICE) | |
self.connection = cec.ICECAdapter.Create(self.cecconfig) | |
self.connected = False | |
self._logical_address = None | |
self._devices = None | |
if init: | |
self.init() | |
def init(self): | |
self._logical_address = None | |
adapters = self.connection.DetectAdapters() | |
if not adapters: | |
raise ConnectionError('No Adapters found') | |
self.connected = self.connection.Open(adapters[0].strComName) | |
def log_callback(self, level, time, message): | |
own_log_level = self.logger.getLogLevel() | |
log_level_map = { | |
cec.CEC_LOG_ERROR: logging.ERROR, | |
cec.CEC_LOG_WARNING: logging.WARNING, | |
cec.CEC_LOG_NOTICE: logging.INFO, | |
cec.CEC_LOG_TRAFFIC: logging.DEBUG, | |
cec.CEC_LOG_DEBUG: logging.DEBUG, | |
} | |
self.logger_bus.log(log_level_map[level], message) | |
@property | |
def logical_address(self): | |
if self._logical_address is None: | |
self._logical_address = self.connection.GetLogicalAddresses().primary | |
return self._logical_address | |
@property | |
def devices(self): | |
if self._devices is None: | |
self._devices = self.scan() | |
return self._devices | |
def raw_command(self, data): | |
self.logger.debug('Sending command: %s', data) | |
cmd = self.connection.CommandFromString(data) | |
return self.connection.Transmit(cmd) | |
def scan(self): | |
addresses = self.connection.GetActiveDevices() | |
activeSource = self.connection.GetActiveSource() | |
x = 0 | |
devices = {} | |
while x < 15: | |
if addresses.IsSet(x): | |
devices[x] = { | |
'vendor_id': self.connection.GetDeviceVendorId(x), | |
'phisical_address': int(str(self.connection.GetDevicePhysicalAddress(x))), | |
'logical_address': x, | |
'active': self.connection.IsActiveSource(x), | |
'cec_version': self.connection.GetDeviceCecVersion(x), | |
'power_status': PowerStatus(self.connection.GetDevicePowerStatus(x) == 0), | |
'osd_name': self.connection.GetDeviceOSDName(x) | |
} | |
x += 1 | |
self._devices = devices | |
return devices | |
def button_release(self, dst, src=None): | |
src = src or self.logical_address | |
return self.raw_command('{src:x}{dst:x}:45'.format(src=src, dst=dst)) | |
def _button_press(self, button, dst, release, src=None): | |
src = src or self.logical_address | |
result_press = self.raw_command( | |
'{src:x}{dst:x}:{button}:00'.format(src=src, dst=dst, button=button)) | |
result_release = True | |
if release: | |
result_release = self.button_release(dst) | |
return result_press and result_release | |
def button_menu(self, dst, release=True): | |
return self._button_press("45", dst, release) | |
def button_select(self, dst, release=True): | |
return self._button_press("44", dst, release) | |
def all_standby(self, src=None, dst=None): | |
src = src or self.logical_address | |
dst = dst or 0 | |
return self.raw_command('{src:x}{dst:x}:36'.format(src=src, dst=dst)) | |
def active_source(self, logical_address=None, phisical_address=None): | |
if logical_address is None and phisical_address is None: | |
raise ValueError('no logical_address nor phisical_address given') | |
target = phisical_address or self.devices[logical_address]['phisical_address'] | |
target = '{:04x}'.format(target) | |
target = target[0:1] + ':' + target[2:3] | |
return self.raw_command('{src}F:82:{target}'.format(src=self.logical_address, target=target)) | |
# do stuff | |
logging.basicConfig() | |
c = CECClient() | |
c.logger.setLevel(logging.DEBUG) | |
print(c.logical_address) | |
print(c.scan()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment