Skip to content

Instantly share code, notes, and snippets.

@aredden
Last active September 26, 2022 01:39
Show Gist options
  • Save aredden/36548ce4418f11569fbff3859ffccf02 to your computer and use it in GitHub Desktop.
Save aredden/36548ce4418f11569fbff3859ffccf02 to your computer and use it in GitHub Desktop.
python parallel-limit style async generator function
from typing import AsyncGenerator, List, Coroutine, Any, Callable, Union
import asyncio
from loguru import logger
async def async_limit_iterator(
limit=8,
jobs: List[Any] = [],
coroutine_fn: Callable[[Any], Coroutine[Any, None, None]] = None,
throw_exceptions: bool = False,
) -> AsyncGenerator[Union[Any, None], None]:
"""
Run a list of jobs in parallel, limiting concurrent jobs to `limit`.
Each iteration, 'coroutine_fn' will be called with one of the jobs from `jobs`
If the result of some job is an error, will return None if throw_exceptions is false (the default),
Args:
limit: number of concurrent jobs
jobs: list of jobs
coroutine_fn: coroutine function to run each job
throw_exceptions: if job raises error, and if false, will return None
Returns:
AsyncGenerator of results
"""
coroutines = []
def inject_job():
if len(jobs) > 0:
coroutines.append(asyncio.create_task(coroutine_fn(jobs.pop(0))))
for _ in range(limit):
inject_job()
while coroutines:
done, coroutines = await asyncio.wait(
coroutines, return_when=asyncio.FIRST_COMPLETED
)
coroutines = list(coroutines)
for task in done:
try:
yield await task
except Exception as e:
if throw_exceptions:
raise e
else:
logger.error(f"Error ignored: {e}")
yield None
inject_job()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment