Skip to content

Instantly share code, notes, and snippets.

@m0nochr0me
Created August 9, 2021 08:14
Show Gist options
  • Save m0nochr0me/d53c5b909832be94b0d7f3a74e2d10d7 to your computer and use it in GitHub Desktop.
Save m0nochr0me/d53c5b909832be94b0d7f3a74e2d10d7 to your computer and use it in GitHub Desktop.
MQTT-WEB bridge
#!/usr/bin/python3
import sys
import time
import base64
from urllib.parse import urlparse, parse_qs
from threading import Thread
import paho.mqtt.client as mqtt
from http.server import BaseHTTPRequestHandler, HTTPServer
MQTTSRV = '192.168.1.1' #'localhost'
MQTTPORT = 1883
MQTTUSER = 'myuser'
MQTTPASS = 'myPa55w0rD'
WEBADDR = '0.0.0.0'
WEBPORT = 8000
TOKEN = 'mySuperSecretToken'
subscribe_topics = (
'temp0',
'lamp0',
'lamp1'
)
publish_topics = (
'lamp0',
'lamp1'
)
DB = {}
def on_conn(client, userdata, flags, rc):
for topic in subscribe_topics:
mqtt_client.subscribe(topic)
def on_msg(client, userdata, msg):
DB[msg.topic] = msg.payload.decode('UTF-8')
def publish(topic, payload):
DB[topic] = payload
mqtt_client.publish(topic, payload, qos=0, retain=False)
class webBridgeHandler(BaseHTTPRequestHandler):
def do_response(self, status = 200, content = 'OK'):
self.send_response(status)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes(content, 'UTF-8'))
def do_GET(self):
parse = urlparse(self.path)
query = parse_qs(parse.query)
query_token = self.headers.get('Auth-Token', None)
if query_token != TOKEN:
self.do_response(status=512, content='TOKEN ERROR')
return
if parse.path.startswith('/sub'):
topic = parse.path.split('/sub/')[-1]
content = DB.get(topic, 'NONE')
self.do_response(content = content)
return
elif parse.path.startswith('/pub'):
topic = parse.path.split('/pub/')[-1]
if topic not in publish_topics:
self.do_response(status=513, content='TOPIC ERROR')
return
payload = query.get('payload', None)
if not payload:
self.do_response(status=514, content='PAYLOAD ERROR')
return
topic_content = payload[0]
if topic_content == 'T':
state = DB.get(topic, None)
if state == '1':
topic_content = '0'
else:
topic_content = '1'
publish(topic, topic_content)
self.do_response()
return
else:
self.do_response(status=500, content='GENERAL FAILURE')
return
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_conn
mqtt_client.on_message = on_msg
mqtt_client.username_pw_set(MQTTUSER, MQTTPASS)
mqtt_client.connect(MQTTSRV, MQTTPORT, 60)
mqtt_client.loop_start()
web_bridge = HTTPServer((WEBADDR, WEBPORT), webBridgeHandler)
try:
web_bridge.serve_forever()
except KeyboardInterrupt:
mqtt_client.loop_stop()
web_bridge.server_close()
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment