Created
March 9, 2020 13:33
-
-
Save lubieowoce/66a300abc9d6f012ced6a60ab050573e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import bleak | |
# https://os.mbed.com/teams/Bluetooth-Low-Energy/wiki/UART-access-over-BLE | |
# https://www.mgsuperlabs.co.in/estore/IoT-pHAT-with-header-for-Raspberry-Pi | |
RBL_SERVICE_UUID = "713D0000-503E-4C75-BA94-3148F18D941E" | |
RBL_CHAR_RX_UUID = "713D0003-503E-4C75-BA94-3148F18D941E" | |
# https://learn.adafruit.com/introducing-adafruit-ble-bluetooth-low-energy-friend/uart-service | |
ADA_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" | |
ADA_CHAR_TX_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" | |
SERVICE_UUIDS = [ | |
"713D0000-503E-4C75-BA94-3148F18D941E", | |
"6E400001-B5A3-F393-E0A9-E50E24DCCA9E", | |
] | |
RX_CHARACTERISTIC_UUIDS = [ | |
"713D0003-503E-4C75-BA94-3148F18D941E", | |
"6E400002-B5A3-F393-E0A9-E50E24DCCA9E", | |
] | |
TX = 0x0002 | |
RX = 0x0003 | |
BOARD_WIDTH = 11 | |
BOARD_HEIGHT = 18 | |
CHUNK_SIZE = 20 | |
# S - start | |
# P - move | |
# E - end | |
BOARD = """ | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . P . . . . . P . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. P . . . . . . . P . | |
. . P . . . . . P . . | |
. . . P P P P P . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
. . . . . . . . . . . | |
"""[1:-1] | |
def pulse(s): | |
return s.replace('P', '#').replace('S', 'P').replace('#', 'S') | |
def encode_board(s: str) -> str: | |
board = [[None for i in range(BOARD_WIDTH)] for j in range(BOARD_HEIGHT)] | |
for row, line in enumerate(s.splitlines()): | |
# line = line.strip() | |
for col, char in enumerate(line.split()): | |
# print(row, BOARD_HEIGHT-col-1, col, char, end=' ') | |
# [BOARD_HEIGHT-col-1][row] = char | |
board[BOARD_HEIGHT-row-1][col] = char | |
# print() | |
pprint(board) | |
board2 = sum(board, []) | |
# pprint(board2) | |
assert len(board2) == BOARD_WIDTH*BOARD_HEIGHT, board2 | |
return 'l#' + ','.join(dot+str(i) for i,dot in enumerate(board2) if dot != '.') + '#' | |
def run(loop, coro): return loop.run_until_complete(coro) | |
def chunks(xs, n): | |
assert n >= 1 | |
chunk = [] | |
for x in xs: | |
chunk.append(x) | |
if len(chunk) == n: | |
yield chunk | |
chunk = [] | |
if chunk: | |
yield chunk | |
async def find_moonboards(): | |
devices = await bleak.discover() | |
return [d for d in devices if 'MoonBoard' in d.name] | |
async def moonboard_pulse(): | |
moonboards = await find_moonboards() | |
if not moonboards: | |
print('no moonboards found') | |
return | |
dev = moonboards[0] | |
print('found', dev) | |
loop = asyncio.get_event_loop() | |
async with bleak.BleakClient(dev.address, loop=loop, address_type='public') as client: | |
c = await client.connect() | |
if not c: | |
print('not connected') | |
return | |
print('connected') | |
await client.write_gatt_char(ADA_CHAR_TX_UUID, b'l#S15,E20#') | |
await client.disconnect() | |
async def moonboard_pulse2(**kwargs): | |
addr = 'C5:D7:BD:EA:B3:F3'.lower() | |
loop = asyncio.get_event_loop() | |
async with bleak.BleakClient(addr, loop=loop, **kwargs) as client: | |
c = await client.connect() | |
if not c: | |
print('not connected') | |
return | |
print('connected') | |
await client.write_gatt_char(ADA_CHAR_TX_UUID, b'l#S15,E20#') | |
await client.disconnect() | |
async def scan_and_get_services(): | |
devices = await bleak.discover() | |
if not devices: | |
print('no devices') | |
return | |
print('devices:') | |
print(*devices, sep='\n') | |
devices = {dev.address: dev for dev in devices} | |
loop = asyncio.get_event_loop() | |
device_services = {} | |
for dev in devices.values(): | |
device_services[dev.address] = {'services': None, 'error': None} | |
try: | |
async with bleak.BleakClient(dev.address, loop=loop) as client: | |
services = await client.get_services() | |
# await client.write_gatt_char(_uuid: str, data: bytearray, response: bool = False) → None | |
# await services.get_service(_uuid) → bleak.backends.service.BleakGATTService | |
# await services.get_characteristic(_uuid) → bleak.backends.characteristic.BleakGATTCharacteristic | |
# await service.get_characteristic(_uuid) → Optional[bleak.backends.characteristic.BleakGATTCharacteristic] | |
print(dev) | |
device_services[dev.address].setdefault('services', []).append(services) | |
for ser in services: | |
print(ser.uuid) | |
print() | |
except Exception as err: | |
device_services[dev.address]['error'] = err | |
return devices, device_services | |
# if __name__ == '__main__': | |
# from pprint import pprint | |
# loop = asyncio.get_event_loop() | |
# devices, device_services = loop.run_until_complete(scan_and_get_services()) | |
# pprint(device_services) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment