start py server, open web in browser
Last active
August 12, 2021 06:34
-
-
Save stek29/876dd36da29223994441cab4e96bc7eb to your computer and use it in GitHub Desktop.
miot vacuum remote control with JS Gamepad API with analog input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta http-equiv="X-UA-Compatible" content="IE=edge"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Vacuum Control</title> | |
</head> | |
<body> | |
<input id="triggerControl" type="checkbox" checked> | |
<label for="triggerControl">accel/decel with triggers instead of left stick</label> | |
<pre id="status">No Controller Connected</pre> | |
<pre id="wsstatus">WebSocket not ready</pre> | |
<script type="text/javascript"> | |
let ws = new WebSocket(`ws://${location.host}/ws`); | |
let wsStatusElement = document.getElementById('wsstatus'); | |
let wsConfig; | |
ws.onopen = () => { | |
wsStatusElement.innerHTML = 'WebSocket connected' | |
}; | |
ws.onclose = () => { | |
wsStatusElement.innerHTML = 'WebSocket closed'; | |
wsConfig = null; | |
}; | |
ws.onmessage = event => { | |
// yay xss :) | |
wsStatusElement.innerHTML = event.data; | |
let msg = JSON.parse(event.data); | |
if (msg.config) { | |
wsConfig = msg.config; | |
} | |
}; | |
let sendMsg = function (angleCoef, accelCoef) { | |
if (!wsConfig) { | |
return; | |
} | |
const angle = parseInt(angleCoef * wsConfig.angle); | |
const velocity = parseInt(accelCoef * wsConfig.velocity); | |
let msg = JSON.stringify({ angle, velocity }); | |
wsStatusElement.innerHTML = msg; | |
ws.send(msg); | |
} | |
</script> | |
<script type="text/javascript"> | |
let statusElement = document.getElementById('status'); | |
let gamepadIndex; | |
let triggerControlEl = document.getElementById('triggerControl'); | |
window.addEventListener('gamepadconnected', (event) => { | |
gamepadIndex = event.gamepad.index; | |
}); | |
const axisAngle = { 'index': 0, 'reverse': true, 'deadzone': 0.1 }; | |
const axisAccel = { 'index': 1, 'reverse': true, 'deadzone': 0.1 }; | |
const btnDecel = { 'index': 6 }; | |
const btnAccel = { 'index': 7 }; | |
let getAxis = (axes, cfg) => { | |
const v = axes[cfg.index]; | |
return (Math.abs(v) < cfg.deadzone) ? 0 : v * (cfg.reverse ? -1 : 1); | |
} | |
let sendInput = ({ buttons, axes }) => | |
sendMsg(getAxis(axes, axisAngle), (triggerControlEl.checked) ? ( | |
buttons[btnAccel.index].value - buttons[btnDecel.index].value | |
) : getAxis(axes, axisAccel)); | |
setInterval(() => { | |
if (gamepadIndex !== undefined) { | |
const gamepad = navigator.getGamepads()[gamepadIndex]; | |
statusElement.innerHTML = ""; // reset page | |
statusElement.innerHTML += gamepad.buttons.filter(e => e.pressed).map((_, i) => `button ${i} is pressed`).join('<br>') | |
statusElement.innerHTML += gamepad.axes.map((v, i) => `axis ${i}: ${v}`).join('<br>') | |
sendInput(gamepad); | |
} | |
}, 100); | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from miio import MiotDevice, exceptions | |
from miio.miioprotocol import MiIOProtocol | |
import argparse | |
from datetime import datetime | |
import json | |
from typing import Callable | |
parser = argparse.ArgumentParser(description='dreame remote control') | |
parser.add_argument('--ip', help='ip addr of robot', required=True) | |
parser.add_argument('--token', help='miot token', required=True) | |
parser.add_argument('--angle', help='angle range', default=60, type=int) | |
parser.add_argument('--velocity', help='velocity range', default=180, type=int) | |
parser.add_argument('--host', help='http/ws bind host', | |
default='0.0.0.0', type=str) | |
parser.add_argument('--port', help='http port', default=8080, type=int) | |
parser.add_argument('--tty', help='tty mode', default=False, type=bool) | |
_ACTIONS = { | |
'step': {'siid': 21, 'aiid': 1}, | |
'stop': {'siid': 21, 'aiid': 2}, | |
} | |
class RemoteControl: | |
def __init__(self, get_device: Callable[[], MiotDevice]): | |
self.get_device = get_device | |
self.device: MiotDevice = None | |
self.angle = 0 | |
self.velocity = 0 | |
def step(self, angle=None, velocity=None): | |
if not self.device: | |
self.device = self.get_device() | |
if angle is not None: | |
self.angle = angle | |
if velocity is not None: | |
self.velocity = velocity | |
try: | |
res = self.device.call_action_by(**_ACTIONS['step'], params=[ | |
{'piid': 1, 'value': str(self.angle)}, | |
{'piid': 2, 'value': str(self.velocity)}, | |
]) | |
return res['code'] == 0 | |
except exceptions.DeviceException: | |
# force reconnect | |
self.device = None | |
return False | |
def stop(self): | |
self.device.call_action_by(**_ACTIONS['stop']) | |
def start_tty(ctl: RemoteControl, velocity, angle): | |
import tty | |
import sys | |
import termios | |
fd = termios.tcgetattr(sys.stdin) | |
tty.setcbreak(sys.stdin) | |
print('wasd for control, q for exit') | |
while True: | |
try: | |
x = sys.stdin.read(1)[0] | |
except KeyboardInterrupt: | |
x = 'q' | |
if x == 'q': | |
ctl.stop() | |
break | |
elif x == 'w': | |
ctl.step(angle=0, velocity=+velocity) | |
elif x == 'a': | |
ctl.step(angle=+angle, velocity=0) | |
elif x == 's': | |
ctl.step(angle=0, velocity=-velocity) | |
elif x == 'd': | |
ctl.step(angle=-angle, velocity=0) | |
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, fd) | |
def start_http(ctl: RemoteControl, velocity: int, angle: int, host: str, port: int): | |
import asyncio | |
import pathlib | |
import aiohttp | |
from aiohttp import web | |
async def handle_ws(request: web.Request): | |
ws = web.WebSocketResponse() | |
await ws.prepare(request) | |
print('new ws connection') | |
await ws.send_str(json.dumps({'config': { | |
'angle': angle, 'velocity': velocity, | |
}})) | |
async for msg in ws: | |
if msg.type == aiohttp.WSMsgType.TEXT: | |
event = json.loads(msg.data) | |
mAngle = event.get('angle') | |
mVelocity = event.get('velocity') | |
if ctl.angle == 0 and ctl.velocity == 0 and not (mAngle or mVelocity): | |
# skip stepping if staying zero | |
ok = True | |
else: | |
ok = ctl.step(angle=mAngle, velocity=mVelocity) | |
await ws.send_str(json.dumps({'status': ok})) | |
elif msg.type == aiohttp.WSMsgType.ERROR: | |
print('ws connection closed with exception %s' % | |
ws.exception()) | |
return ws | |
def create_runner(): | |
app = web.Application() | |
app.add_routes([ | |
web.get('/', handle_http), | |
web.get('/ws', handle_ws), | |
]) | |
app.router.add_static('/s', pathlib.Path(__file__).parent.resolve()) | |
return web.AppRunner(app) | |
async def handle_http(_: web.Request): | |
raise web.HTTPFound('/s/index.html') | |
async def start_server(): | |
runner = create_runner() | |
await runner.setup() | |
site = web.TCPSite(runner, host, port) | |
await site.start() | |
print(f'starting server on {host}:{port}') | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(start_server()) | |
loop.run_forever() | |
class PatchedProtocol(MiIOProtocol): | |
MAX_ID = 65535 | |
@MiIOProtocol._id.getter | |
def _id(self): | |
self._MiIOProtocol__id += 1 | |
if self._MiIOProtocol__id >= PatchedProtocol.MAX_ID: | |
self._MiIOProtocol__id = 1 | |
return self._MiIOProtocol__id | |
if __name__ == '__main__': | |
args = parser.parse_args() | |
def get_device(): | |
# dirty hack for sequence ids | |
start_id = int(datetime.utcnow().timestamp()*100) % PatchedProtocol.MAX_ID | |
device = MiotDevice(ip=args.ip, token=args.token, start_id=start_id) | |
device.retry_count = 0 | |
device._protocol = PatchedProtocol(ip=args.ip, token=args.token, start_id=start_id) | |
print('id1', device._protocol._id) | |
print('id2', device._protocol._id) | |
return device | |
ctl = RemoteControl(get_device) | |
if args.tty: | |
start_tty(ctl, velocity=args.velocity, angle=args.angle) | |
else: | |
start_http( | |
ctl, velocity=args.velocity, angle=args.angle, | |
host=args.host, port=args.port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment