Last active
March 26, 2024 15:21
-
-
Save jramseygreen/a80c9788cfeee0df130195fee948339b to your computer and use it in GitHub Desktop.
Implement SSE for flask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Blueprint, jsonify | |
from sse import sse | |
# required line at the top of every blueprint file | |
api = Blueprint("api", __name__) # match variable name and first arg to file name | |
# register more blueprints here to further split up the api | |
# e.g. | |
# api.register_blueprint(blueprint, url_prefix='/users') | |
# would cascade through /api/users | |
# api routes when hitting /api | |
@api.route("/") | |
def heartbeat(): | |
return jsonify({"status": "healthy"}) | |
# if you wanted you could supply a unique id to the listen method to send messages to specific clients | |
@api.route("/listen") | |
def listen(): | |
client, stream = sse.listen() | |
return stream | |
# this will ping all sse clients | |
@api.route('/ping') | |
def pingall(): | |
sse.announce('command', {'data': 'data'}) | |
return {}, 200 | |
# this will ping all sse clients | |
@api.route('/ping/<int:client>') | |
def ping(client: int): | |
sse.send(client, 'command', {'data': 'data'}) | |
return {}, 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from queue import Queue | |
from flask import Response | |
from json import dumps | |
class __Sse: | |
def __init__(self): | |
self.__listeners = {} | |
def listen(self, client: int=None) -> Response: | |
q = Queue(maxsize=5) | |
if not client: | |
client = id(q) | |
self.__listeners[client] = q | |
# generator for flask response | |
def stream(): | |
while True: | |
msg = q.get() | |
if msg == 'stop': | |
break | |
yield 'event: message\ndata: ' + dumps(msg) + '\n\n' | |
response = Response(stream(), content_type='text/event-stream') | |
response.headers['Cache-Control'] = 'no-cache' | |
response.headers['X-Accel-Buffering'] = 'no' | |
return client, response | |
def announce(self, event: str, data): | |
for id in self.__listeners: | |
self.send(id, event, data) | |
def send(self, client: int, event: str, data): | |
msg = 'event: ' + event + '\ndata: ' + dumps(data) + '\n\n' | |
try: | |
self.__listeners[client].put_nowait(msg) | |
except queue.Full: | |
del self.__listeners[client] | |
def stop(self): | |
for id in list(self.__listeners): | |
self.__listeners[id].put_nowait('stop') | |
sse = __Sse() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment