Last active
February 8, 2022 15:46
-
-
Save vytas7/f40c5299b0ca6c85cddceef01c057372 to your computer and use it in GitHub Desktop.
Hybrid ASGI/WSGI app in the same process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
import logging | |
import threading | |
import uuid | |
import falcon | |
import falcon.asgi | |
import gunicorn.app.base | |
import uvicorn | |
# NOTE(vytas): Useful since ASGI otherwise has nothing like wsgierrors. | |
logging.basicConfig( | |
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) | |
class RequestID: | |
def process_request(self, req, resp): | |
req.context.request_id = str(uuid.uuid4()) | |
async def process_request_async(self, req, resp): | |
self.process_request(req, resp) | |
class Hub: | |
TIMEOUT = 5.0 | |
def __init__(self): | |
self._loop = None | |
self._queues = set() | |
async def process_startup(self, scope, event): | |
# NOTE: Another way to get Uvicorn's loop is manually creating one and | |
# passing it to Uvicorn: | |
# https://github.com/encode/uvicorn/issues/706#issuecomment-652220153 | |
self._loop = asyncio.get_running_loop() | |
async def _enqueue(self, message): | |
for queue in self._queues: | |
await queue.put(message) | |
async def events(self, request_id): | |
logging.info(f'<{request_id}>: SSE emitter starting') | |
queue = asyncio.Queue() | |
self._queues.add(queue) | |
try: | |
while True: | |
try: | |
message = await asyncio.wait_for( | |
queue.get(), timeout=self.TIMEOUT) | |
logging.info(f'<{request_id}>: ==> {message}') | |
yield falcon.asgi.SSEvent(json=message, retry=5000) | |
except asyncio.TimeoutError: | |
yield falcon.asgi.SSEvent() | |
finally: | |
self._queues.discard(queue) | |
logging.info(f'<{request_id}>: SSE emitter exiting') | |
def broadcast(self, message): | |
if self._loop is not None: | |
asyncio.run_coroutine_threadsafe( | |
self._enqueue(message), self._loop) | |
else: | |
logging.warning(f'Cannot broadcast {message}: no known loop yet') | |
class EventStream: | |
def __init__(self, hub): | |
self._hub = hub | |
async def on_get(self, req, resp): | |
resp.sse = self._hub.events(req.context.request_id) | |
class SyncResource: | |
def __init__(self, hub): | |
self._hub = hub | |
def on_get(self, req, resp): | |
resp.media = {'greeting': 'Hello!'} | |
def on_post(self, req, resp): | |
self._hub.broadcast(req.get_media()) | |
resp.status = falcon.HTTP_ACCEPTED | |
class HybridApplication(gunicorn.app.base.BaseApplication): | |
hub = Hub() | |
@classmethod | |
def post_fork(cls, arbiter, worker): | |
asgi_app = falcon.asgi.App(middleware=[RequestID(), cls.hub]) | |
asgi_app.add_route('/sse', EventStream(cls.hub)) | |
# TODO: Use Gunicorn's hooks to properly join() instead of daemon=True. | |
uvicorn_thread = threading.Thread( | |
target=uvicorn.run, | |
args=(asgi_app,), | |
kwargs=dict(host='127.0.0.1', port=8002, log_level='debug'), | |
daemon=True) | |
uvicorn_thread.start() | |
def __init__(self): | |
self.options = { | |
'bind': '127.0.0.1:8000', | |
'post_fork': self.post_fork, | |
'workers': 1, | |
'threads': 4, | |
} | |
self.application = app = falcon.App() | |
app.add_route('/messages', SyncResource(self.hub)) | |
super().__init__() | |
def load_config(self): | |
config = {key: value for key, value in self.options.items() | |
if key in self.cfg.settings and value is not None} | |
for key, value in config.items(): | |
self.cfg.set(key.lower(), value) | |
def load(self): | |
return self.application | |
if __name__ == '__main__': | |
HybridApplication().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment