Skip to content

Instantly share code, notes, and snippets.

@tausen
Created September 22, 2023 20:31
Show Gist options
  • Save tausen/d79ebc7f62845f0159d0291533886155 to your computer and use it in GitHub Desktop.
Save tausen/d79ebc7f62845f0159d0291533886155 to your computer and use it in GitHub Desktop.
InfiniTime bluetooth attribute logging
#!/usr/bin/env python3
import time
import struct
import asyncio
from bleak import BleakClient, BleakScanner, exc
from bleak.backends.characteristic import BleakGATTCharacteristic
UUID_HEARTRATE = '00002a37-0000-1000-8000-00805f9b34fb'
UUID_MOTION = '00030002-78fc-48fe-8e23-433b3a1942d0'
OUTFILE_HR = None
OUTFILE_MOTION = None
def heartrate_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
if OUTFILE_HR is not None:
hr = struct.unpack(">H", data)[0]
if hr != 0:
print(f"hr: {hr}")
OUTFILE_HR.write(f"{time.time()}\t{hr}\n")
OUTFILE_HR.flush()
def motion_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
if OUTFILE_MOTION is not None:
mtn = struct.unpack("<hhh", data)
print(f"motion: {mtn}")
OUTFILE_MOTION.write(f"{time.time()}\t{mtn[0]}\t{mtn[1]}\t{mtn[2]}\n")
OUTFILE_MOTION.flush()
async def main():
global OUTFILE_HR, OUTFILE_MOTION
OUTFILE_HR = open("hr.tsv", "w", encoding="utf8")
OUTFILE_MOTION = open("motion.tsv", "w", encoding="utf8")
disconnected_event = asyncio.Event()
def disconnected_callback(_):
print("Disconnected")
disconnected_event.set()
try:
while True:
device = await BleakScanner.find_device_by_name("InfiniTime", adapter="hci1")
if device is None:
raise RuntimeError("could not find device")
async with BleakClient(device, disconnected_callback=disconnected_callback) as client:
print("Connected")
await client.start_notify(UUID_HEARTRATE, heartrate_handler)
await client.start_notify(UUID_MOTION, motion_handler)
await disconnected_event.wait()
disconnected_event.clear()
try:
await client.stop_notify(UUID_HEARTRATE)
await client.stop_notify(UUID_MOTION)
except exc.BleakError:
print("Could not stop notify")
await asyncio.sleep(30)
except Exception as ex:
OUTFILE_HR.close()
OUTFILE_MOTION.close()
raise ex
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
if not OUTFILE_HR.closed:
OUTFILE_HR.close()
if not OUTFILE_MOTION.closed:
OUTFILE_MOTION.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment