Skip to content

Instantly share code, notes, and snippets.

@harrisont
Last active April 13, 2020 12:10
Show Gist options
  • Save harrisont/146f6dcb1270a0f72adbb905d0ab7ee8 to your computer and use it in GitHub Desktop.
Save harrisont/146f6dcb1270a0f72adbb905d0ab7ee8 to your computer and use it in GitHub Desktop.
as_completed_with_max_concurrent: similar to asyncio.as_completed, but limits the concurrently running futures
import asyncio
import itertools
def as_completed_with_max_concurrent(futures, max_concurrent, loop=None, timeout=None):
"""Tweaked version of `asyncio.as_completed` with the addition of the `max_concurrent` param.
The main change is to only queue (`_queue_future`) the first `max_concurrent` futures initially.
The rest will be queued in `_on_completion`.
"""
if isinstance(futures, asyncio.futures.Future) or asyncio.coroutines.iscoroutine(futures):
raise TypeError("expect a list of futures, not %s" % type(futures).__name__)
loop = loop if loop is not None else asyncio.events.get_event_loop()
todo = set()
done = asyncio.Queue(loop=loop)
timeout_handle = None
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
try:
future_to_queue = next(futures_to_queue_iter)
_queue_future(future_to_queue)
except StopIteration:
# Finished adding futures to todo
pass
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
def _queue_future(f):
wrapped = asyncio.ensure_future(f, loop=loop)
wrapped.add_done_callback(_on_completion)
todo.add(wrapped)
@asyncio.coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise asyncio.futures.TimeoutError
return f.result() # May raise f.exception().
# Only queue the first `max_concurrent` futures initially.
# The rest will be queued in `_on_completion`.
futures_set = set(futures)
futures_to_queue_iter = futures_set.__iter__()
for future in itertools.islice(futures_to_queue_iter, 0, max_concurrent):
_queue_future(future)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(futures_set)):
yield _wait_for_one()
import asyncio
from contextlib import closing
import random
import asyncio_helper
async def async_index_printer(index: int):
print('start', index)
await asyncio.sleep(random.uniform(1, 3))
return index
async def async_index_printer_with_finisher():
futures = map(async_index_printer, range(5))
# Use as_completed_with_max_concurrent instead of asyncio.as_completed.
for completed_future in asyncio_helper.as_completed_with_max_concurrent(futures, max_concurrent=2):
result = await completed_future
print('end', result)
def main():
with closing(asyncio.get_event_loop()) as loop:
loop.set_debug(True)
future = async_index_printer_with_finisher()
loop.run_until_complete(future)
main()
import asyncio
from contextlib import closing
import random
async def async_index_printer(index: int):
print('start', index)
await asyncio.sleep(random.uniform(1, 3))
return index
async def do_with_semaphore(semaphore: asyncio.Semaphore, future):
async with semaphore:
return await future
async def async_index_printer_with_finisher():
semaphore = asyncio.Semaphore(2)
futures = [do_with_semaphore(semaphore, async_index_printer(i)) for i in range(5)]
for completed_future in asyncio.as_completed(futures):
result = await completed_future
print('end', result)
def main():
with closing(asyncio.get_event_loop()) as loop:
loop.set_debug(True)
future = async_index_printer_with_finisher()
loop.run_until_complete(future)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment