Created
September 19, 2022 14:04
-
-
Save XoseLluis/586b1f38527404a1f3147bccaba8e345 to your computer and use it in GitHub Desktop.
AsyncExecutor class to limit the number of python awaitables running at a given time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from dataclasses import dataclass | |
import random | |
from typing import Any | |
import json | |
@dataclass | |
class AsyncAction: | |
future: asyncio.Future | |
awaitable_fn: Any # function that returns an awaitable (coroutine, a task...) | |
args: list[Any] | |
kwargs: list[Any] | |
class AsyncExecutor: | |
def __init__(self, event_loop: asyncio.AbstractEventLoop, max_running_actions: int): | |
self.event_loop = event_loop | |
self.max_running_actions = max_running_actions | |
self.running_counter = 0 | |
self.not_launched_actions = [] | |
def submit(self, awaitable_fn, *args, **kwargs) -> asyncio.Future: | |
""" | |
receives a function to be executed when there's one available slot. That function returns and awaitable | |
""" | |
future = self.event_loop.create_future() | |
action = AsyncAction(future, awaitable_fn, args, kwargs) | |
if self.running_counter < self.max_running_actions: | |
self.running_counter += 1 | |
# _run_action returns a coroutine, so if I'm not awaiting it need to run it as a task | |
#self._run_action(action) | |
asyncio.create_task(self._run_action(action)) | |
else: | |
self.not_launched_actions.append(action) | |
return future | |
async def _run_action(self, action: AsyncAction): | |
# self.running_counter += 1 | |
result = await action.awaitable_fn(*(action.args), **(action.kwargs)) | |
self._process_result(action, result) | |
def _process_result(self, action: AsyncAction, result: Any): | |
self.running_counter -= 1 | |
action.future.set_result(result) | |
if len(self.not_launched_actions): | |
self.running_counter += 1 | |
asyncio.create_task(self._run_action(self.not_launched_actions.pop(0))) | |
async def mock_download(url: str, delay: int): | |
print("starting mock download") | |
await asyncio.sleep(delay) | |
return url.upper() | |
def create_download_task(url: str, delay: int): | |
print(create_download_task.__name__) | |
return asyncio.get_running_loop().create_task(mock_download(url, delay)) | |
async def main(): | |
async_executor = AsyncExecutor(asyncio.get_running_loop(), 4) | |
futures = [] | |
for i in range(0,10): | |
delay = random.randint(1, 4) | |
if i % 2 == 0: | |
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay) | |
else: | |
future = async_executor.submit(create_download_task, f"www.jesoutienslapolice.fr/post_{i}", delay) | |
future.add_done_callback(lambda fut: print(f"{fut.result()} done")) | |
futures.append(future) | |
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay) | |
future.add_done_callback(lambda fut: print(f"{fut.result()} done")) | |
futures.append(future) | |
print(f"{len(futures)} submitted") | |
results = await asyncio.gather(*futures) | |
print(f"all finished: {json.dumps(results, indent=4)}") | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment