Skip to content

Instantly share code, notes, and snippets.

@mikeckennedy
Last active April 2, 2023 07:02
Show Gist options
  • Save mikeckennedy/c76739766ce072f980aa4df1a6dc9516 to your computer and use it in GitHub Desktop.
Save mikeckennedy/c76739766ce072f980aa4df1a6dc9516 to your computer and use it in GitHub Desktop.
Convert an async method to a synchronous one.
import asyncio
import functools
from typing import Any, Coroutine
loop = asyncio.new_event_loop()
def run(async_coroutine: Coroutine[Any, Any, Any]):
"""
Convert an async method to a synchronous one.
Example:
async def some_async_method(x, y): ...
result = syncify.run( some_async_method(1, 2) )
Args:
async_coroutine ():
Returns: The value returned by `async_coroutine`
"""
return loop.run_until_complete(async_coroutine)
@mikeckennedy
Copy link
Author

This often works well for creating a sync and async API. For example, a data layer using an async db that can be used in an async web app as well as a sync utility script.

def get_redirect(short_url: str) -> Optional[CmsRedirect]:
    return syncify.run(get_redirect_async(short_url))


async def get_redirect_async(short_url: str) -> Optional[CmsRedirect]:
    if not short_url or not short_url.strip():
        return None

    return await CmsRedirect.find_one(CmsRedirect.short_url == short_url.strip().lower())

Here you can call get_redirect('/terms-of-service') in your simple script or sync code. But await get_redirect_async('/terms-of-service') in code that is async capable.

@mikeckennedy
Copy link
Author

Here is the way more intense version I'm actually using based on uvloop:

https://gist.github.com/mikeckennedy/033ad92c165a9041fafc5c429e6c3c28

@eevmanu
Copy link

eevmanu commented Mar 7, 2022

@mikeckennedy thanks for this example.

One quick question: Do you think this behaviour would be possible to replicate in a more mild version with only asyncio high-level APIs? with asyncio.wait(...) and return_when=ALL_COMPLETED argument, or any inmediate trade-off I should be aware to prefer low-level APIs (asyncio.loop.run_until_complete) or even threading (in your more intense version)?. Thanks.

@mikeckennedy
Copy link
Author

You could definitely implement it with: asyncio.wait(...) and return_when=ALL_COMPLETED. But if you encounter a long running task, it could start to back up the entire async flow. This way as soon as a task gets done, it gets dropped into the done queue ineffectively is finished for the process. I do realize, that if the first task is the slow running task it's the same thing here. But at least, in a slightly more complicated way above, as soon as a task is done in order, it's available for pick up. And the other task can still be running. So if you have many short tasks, they won't affect the latency as much.

That's what I was thinking anyway. :-)

@eevmanu
Copy link

eevmanu commented Mar 23, 2022

Thanks for your answer @mikeckennedy, long and slow running tasks are important to consider.

I had a preference to high-level APIs because I was planning to teach this approach (syncify) but with the least possible complexity.

@mikeckennedy
Copy link
Author

You're welcome. You might look into what they are suggesting in this Stackoverflow answer:

https://stackoverflow.com/questions/39400885/await-for-any-future-asyncio/39407084#39407084

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    while futures:
        done, futures = await asyncio.wait(futures, 
            loop=loop, return_when=asyncio.FIRST_COMPLETED)  
        for f in done:
            await f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment