Last active
April 15, 2021 16:24
-
-
Save timhughes/57d166c12ac5deb8930e1d9d06165ad6 to your computer and use it in GitHub Desktop.
Micropython basic webserver example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import gc | |
import logging | |
import micropython | |
try: | |
import ujson as json | |
except ImportError: | |
import json | |
try: | |
import uasyncio as asyncio | |
except ImportError: | |
import asyncio | |
try: | |
import utime as time | |
except: | |
import time | |
STATUS_MESSAGES = { | |
200: "OK", | |
400: "Bad Request", | |
401: "Unauthorized", | |
403: "Forbidden", | |
404: "Not Found", | |
500: "Internal Server Error", | |
501: "Not Implemented", | |
} | |
class Request: | |
def __init__(self, method, path, version, headers): | |
self.headers = headers | |
self.version = version | |
self.uri = path | |
self.method = method | |
class Response: | |
def __init__(self, | |
response=None, | |
status=200, | |
headers=None, | |
mimetype=None, | |
content_type=None, | |
): | |
self.response = response | |
self.status = status | |
self.headers = headers | |
self.mimetype = mimetype | |
self.content_type = content_type | |
class WebApp: | |
request_class = Request | |
response_class = Response | |
def __init__(self, name="WEB"): | |
self.name = name | |
self._routes = [] | |
self._server = None | |
def _get_handler(self, req): | |
for route in self._routes: | |
match = route["pattern"].match(req.uri) | |
if match: | |
if req.method in route["methods"]: | |
return route["handler"] | |
else: | |
return None | |
def route(self, pattern, methods=['GET']): | |
if not pattern.endswith('$'): | |
pattern = pattern + '$' | |
def _route(f): | |
self._routes.append({"pattern": re.compile(pattern), "handler": f, "methods": methods}) | |
return f | |
return _route | |
async def _handle(self, reader, writer): | |
finished = False | |
if self.debug: | |
micropython.mem_info() | |
peername = writer.get_extra_info("peername")[0] | |
try: | |
request_line = await reader.readline() | |
method, path, version = request_line.decode().split() | |
headers = {} | |
while True: | |
line = await reader.readline() | |
if not line or line == b"\r\n": | |
break | |
key, value = line.decode().strip().split(': ') | |
headers[key.strip().lower()] = value.strip() | |
headers[key.strip().lower()] = value.strip() | |
req = self.request_class(method, path, version, headers) | |
self.log.info('%.3f %s "%s" "%s %s"', time.time(), peername, headers.get("user-agent"), method, path) | |
handler = self._get_handler(req) | |
if handler: | |
rv = handler() | |
else: | |
rv = Response(STATUS_MESSAGES[404], status=404) | |
resp = self.make_response(rv) | |
finished = await self.send_response(writer, resp) | |
except Exception as exc: | |
self.log.error('%.3f "%s" "%s"', time.time(), peername, exc) | |
finally: | |
if not finished: | |
writer.close() | |
gc.collect() | |
async def send_response(self, writer, resp): | |
writer.write(("HTTP/1.1 %s %s\r\n" % (resp.status, STATUS_MESSAGES[resp.status])).encode()) | |
if resp.headers: | |
for k, v in resp.headers.items(): | |
writer.write(("%s: %s\r\n" % (k, v)).encode()) | |
writer.write(("Content-Type: %s\r\n" % resp.content_type).encode()) | |
writer.write(("Content-Length: %s\r\n" % len(resp.response)).encode()) | |
writer.write("\r\n".encode()) | |
writer.write(resp.response.encode()) | |
await writer.drain() | |
writer.close() | |
return True | |
def make_response(self, rv): | |
status = headers = None | |
if rv is None: | |
raise TypeError("The view function did not work") | |
if not isinstance(rv, self.response_class): | |
if isinstance(rv, str): | |
rv = self.response_class(rv, status=200, headers=headers) | |
if isinstance(rv, dict): | |
rv = self.response_class(json.dumps(rv), status=200, headers=headers) | |
return rv | |
async def serve(self, host, port): | |
self.log.info('Starting async server on {host}:{port}...'.format( | |
host=host, port=port)) | |
self._server = await asyncio.start_server(self._handle, host, port, self._backlog) | |
async def run(self, host="127.0.0.1", port=8081, debug=False, log=None, backlog=10): | |
gc.collect() | |
if log is None: | |
log = logging.getLogger(self.name) | |
if debug: | |
log.setLevel(logging.DEBUG) | |
self.log = log | |
self.debug = debug | |
self._host = host | |
self._port = port | |
self._backlog = backlog | |
await self.serve(host, port) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
try: | |
import uasyncio as asyncio | |
except ImportError: | |
import asyncio | |
from web_goldfish import app | |
logging.basicConfig(level=logging.DEBUG) | |
log = logging.getLogger("gfweb") | |
async def main(): | |
await app.run(host="0.0.0.0", port=8080, log=log, debug=False) | |
if __name__ == '__main__': | |
asyncio.run(main()) | |
asyncio.get_event_loop().run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from goldfishweb import WebApp, Response | |
app = WebApp(__name__) | |
@app.route("", methods=["GET", "POST"]) | |
def index(): | |
htmldoc = """<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Microdot Example Page</title> | |
</head> | |
<body> | |
<div> | |
<h1>Microdot Example Page</h1> | |
<p>Hello from Microdot!</p> | |
<p>See the <a href='details'>details</a></p> | |
</div> | |
</body> | |
</html> | |
""" | |
return Response(response=htmldoc, content_type="text/html") | |
@app.route("/details") | |
def details(): | |
response = Response(status=200) | |
response.content_type = "text/html" | |
response.response = """<html> | |
<head> | |
<title>Hello World!</title> | |
</head> | |
<body> | |
<p>We have run out of details</p> | |
<p>Lets checkout <a href="redirect">this page</a> instead</p> | |
</body> | |
</html> | |
""" | |
return response | |
@app.route("/redirect") | |
def redirect(): | |
response = Response() | |
return response.redirect('/') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment