Last active
September 26, 2022 01:39
-
-
Save aredden/36548ce4418f11569fbff3859ffccf02 to your computer and use it in GitHub Desktop.
python parallel-limit style async generator function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import AsyncGenerator, List, Coroutine, Any, Callable, Union | |
import asyncio | |
from loguru import logger | |
async def async_limit_iterator( | |
limit=8, | |
jobs: List[Any] = [], | |
coroutine_fn: Callable[[Any], Coroutine[Any, None, None]] = None, | |
throw_exceptions: bool = False, | |
) -> AsyncGenerator[Union[Any, None], None]: | |
""" | |
Run a list of jobs in parallel, limiting concurrent jobs to `limit`. | |
Each iteration, 'coroutine_fn' will be called with one of the jobs from `jobs` | |
If the result of some job is an error, will return None if throw_exceptions is false (the default), | |
Args: | |
limit: number of concurrent jobs | |
jobs: list of jobs | |
coroutine_fn: coroutine function to run each job | |
throw_exceptions: if job raises error, and if false, will return None | |
Returns: | |
AsyncGenerator of results | |
""" | |
coroutines = [] | |
def inject_job(): | |
if len(jobs) > 0: | |
coroutines.append(asyncio.create_task(coroutine_fn(jobs.pop(0)))) | |
for _ in range(limit): | |
inject_job() | |
while coroutines: | |
done, coroutines = await asyncio.wait( | |
coroutines, return_when=asyncio.FIRST_COMPLETED | |
) | |
coroutines = list(coroutines) | |
for task in done: | |
try: | |
yield await task | |
except Exception as e: | |
if throw_exceptions: | |
raise e | |
else: | |
logger.error(f"Error ignored: {e}") | |
yield None | |
inject_job() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment