Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save bugcy013/d35a73fe85914c176425d637c06d2d7e to your computer and use it in GitHub Desktop.
Save bugcy013/d35a73fe85914c176425d637c06d2d7e to your computer and use it in GitHub Desktop.
Limit concurrency with Python asyncio
import asyncio
from typing import Coroutine, List, Sequence
def _limit_concurrency(
coroutines: Sequence[Coroutine], concurrency: int
) -> List[Coroutine]:
"""Decorate coroutines to limit concurrency.
Enforces a limit on the number of coroutines that can run concurrently in higher
level asyncio-compatible concurrency managers like asyncio.gather(coroutines) and
asyncio.as_completed(coroutines).
"""
semaphore = asyncio.Semaphore(concurrency)
async def with_concurrency_limit(coroutine: Coroutine) -> Coroutine:
async with semaphore:
return await coroutine
return [with_concurrency_limit(coroutine) for coroutine in coroutines]
async def sleep_for_seconds(seconds: float) -> None:
print(f"Going to sleep for {seconds} seconds...")
await asyncio.sleep(seconds)
print(f"Woke up after {seconds} seconds!")
async def main():
coroutines = [sleep_for_seconds(1), sleep_for_seconds(2), sleep_for_seconds(3)]
await asyncio.gather(*_limit_concurrency(coroutines, concurrency=2))
if __name__ == "__main__":
asyncio.run(main())
# Going to sleep for 1 seconds...
# Going to sleep for 2 seconds...
# Woke up after 1 seconds!
# Going to sleep for 3 seconds...
# Woke up after 2 seconds!
# Woke up after 3 seconds!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment