Last active
April 16, 2023 15:12
-
-
Save adamsvoboda/76d353d3d272d6c828af2319b10ae7fd to your computer and use it in GitHub Desktop.
asynchronous rethinkdb change feeds with django channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from rethinkdb import r, RqlRuntimeError | |
from channels import Group | |
r.set_loop_type('asyncio') # RethinkDB's driver must use asyncio for the loop type. | |
def create_changefeed_monitor(feeds): | |
""" | |
Creates a new asyncio event loop and starts the parallel execution of attach_changefeed | |
for each table we want to monitor. Easily extended to accept whatever arguements you need. | |
Once your routing is setup, this function could be called like: | |
Channel('create-changefeed-monitor').send({'tables': ['table1', 'table2']}) | |
Resources: | |
https://www.wordfugue.com/using-django-channels-email-sending-queue | |
https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future | |
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_until_complete | |
""" | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
tasks = [] | |
for table in feeds['tables']: | |
tasks.append(asyncio.ensure_future(changefeed_attach(table))) | |
loop.run_until_complete(asyncio.wait(tasks)) | |
loop.close() | |
# RethinkDB Changefeeds | |
# https://rethinkdb.com/docs/changefeeds/python/ | |
async def changefeed_attach(table): | |
""" | |
Creates a new RethinkDB connection for each instance of the function, attaches to | |
the changefeed for the table, awaits for any changes to be emitted from the | |
AsyncioCursor returned by RethinkDB! | |
The python RethinkDB driver is not thread-safe, but I don't think creating new | |
connections for each changefeed monitor is a big deal. "RethinkDB doesn't create a | |
thread per connection, so once established, you can have tens of thousands of | |
connections with very little overhead." | |
It may be worth investigating a Queue.Queue() object and doing something like a | |
RethinkDB connection pool for projects that use a lot of RethinkDB. | |
""" | |
try: | |
conn = await r.connect('127.0.0.1', 28015, db='your_database') | |
feed = await r.table(table).changes().run(conn) | |
while await feed.fetch_next(): | |
change = await feed.next() | |
# Do something with the change. I'm broadcasting it out to a group for testing. | |
Group('changefeed-' + table).send({ | |
"type": "changefeed", | |
"table": table, | |
"old": change['old_val'], | |
"new": change['new_val'], | |
}) | |
except RqlRuntimeError as e: | |
# TODO: Implement changefeed error recovery or task monitoring | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment