Skip to content

Instantly share code, notes, and snippets.

@j0e1in
Last active April 21, 2018 19:45
Show Gist options
  • Save j0e1in/dcdc253defba4f575fb308ef4f2af8ce to your computer and use it in GitHub Desktop.
Save j0e1in/dcdc253defba4f575fb308ef4f2af8ce to your computer and use it in GitHub Desktop.
Run an async function / coroutine in a thread.
##########################################################
# Use cases
# 1. Start an async function in __init__
# 2. Start an endless (while loop) async function while current event loop is running
# 3. ...
#
# Limitations
# 1. Some libraries (eg. motor) cannot be executed using this method, because you can't control how event loop is retrieved. (See another method below)
#
# Usage:
# async def func(count, add=1)
# while True:
# count += add
# print(count)
# await asyncio.sleep(1)
#
# th = run_async_in_thread(func, 1, add=3)
#
# # Tip: restart a thread if it is dead
# while True:
# if not th.is_alive():
# th = run_async_in_thread(func, 1, add=3)
##########################################################
from threading import Thread
import asyncio
def run_async_in_thread(func, *args, **kwargs):
def run_async(func, loop, *args, **kwargs):
loop.run_until_complete(func(*args, **kwargs))
loop = asyncio.new_event_loop()
th = Thread(target=run_async, args=(func, loop, *args), kwargs=kwargs)
th.start()
return th
# Another option is to use `asyncio.run_coroutine_threadsafe`.
# This method is more robust to many situation,
# but it requires an additional step to get traceback.
import asyncio
import sys
import traceback
async def catch_traceback(fn, *args, **kwargs):
""" Wraps `fn`(coroutine) in order to preserve the
traceback of any kind of exception.
"""
try:
return await fn(*args, **kwargs)
except Exception:
raise sys.exc_info()[0](traceback.format_exc())
loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(
catch_traceback(func, 1, add=3), loop)
while True:
try:
res = future.exception(timeout=1) # try to retrive exception / result
except concurrent.futures.TimeoutError:
pass # skip and do other tasks first
else:
# handle exception / result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment