Last active
June 9, 2024 05:29
-
-
Save gustabot42/bbd4a5427a59980d98bb20315ff49d69 to your computer and use it in GitHub Desktop.
Asyncio Event With Result
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
from asyncio import Event | |
from collections.abc import Awaitable | |
from collections.abc import Callable | |
from collections.abc import Iterable | |
from result import Err | |
from result import Result | |
class EventResult(Event): | |
"""Like asyncio.Event but with a result value.""" | |
def __init__(self): | |
self._waiters = collections.deque() | |
self._value: Result = None | |
def __repr__(self): | |
res = super().__repr__() | |
extra = "set" if self._value is not None else "unset" | |
if self._waiters: | |
extra = f"{extra}, waiters:{len(self._waiters)}" | |
return f"<{res[1:-1]} [{extra}]>" | |
def is_set(self): | |
"""Return True if and only if the internal flag is not set.""" | |
return self._value is not None | |
def set(self, value: Result): | |
"""Set the internal value. All coroutines waiting for it are awakened. | |
Coroutine that call wait() once value is set will not block at all. | |
""" | |
if self._value is None: | |
self._value = value | |
for fut in self._waiters: | |
if not fut.done(): | |
fut.set_result(True) | |
def clear(self): | |
"""Reset the internal value to None. Subsequently, coroutines calling | |
wait() will block until set() is called to set the internal value again. | |
""" | |
self._value = None | |
async def wait(self): | |
"""Block until the internal value is set. | |
If the internal value is set on entry, return value immediately. | |
Otherwise, block until another coroutine calls set() then return value. | |
""" | |
if self._value is not None: | |
return self._value | |
fut = self._get_loop().create_future() | |
self._waiters.append(fut) | |
try: | |
await fut | |
return self._value | |
finally: | |
self._waiters.remove(fut) | |
async def wait_for(self, timeout: float | None = None) -> Result: | |
try: | |
await asyncio.wait_for(self.wait(), timeout=timeout) | |
except TimeoutError: | |
return Err("TimeoutError") | |
return self._value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from collections.abc import Callable | |
import pytest | |
from result import Err | |
from result import Ok | |
from hermes.utils.asyncio import EventResult | |
async def delay_callable(delay: float, func: Callable, *func_args, **func_kwargs): | |
await asyncio.sleep(delay) | |
return func(*func_args, **func_kwargs) | |
def test_is_set(): | |
event = EventResult() | |
assert not event.is_set() | |
event.set(Ok("value")) | |
assert event.is_set() | |
def test_clear(): | |
event = EventResult() | |
event.set(Ok("value")) | |
event.clear() | |
assert not event.is_set() | |
@pytest.mark.asyncio() | |
async def test_wait(): | |
event = EventResult() | |
results = await asyncio.gather(delay_callable(0.1, event.set, Ok("value")), event.wait()) | |
assert results[1] == Ok("value") | |
@pytest.mark.asyncio() | |
async def test_wait_for(): | |
event = EventResult() | |
results = await asyncio.gather(delay_callable(0.1, event.set, Ok("value")), event.wait_for()) | |
assert results[1] == Ok("value") | |
@pytest.mark.asyncio() | |
async def test_wait_for_timeout(): | |
event = EventResult() | |
result = await event.wait_for(0.1) | |
assert result == Err("TimeoutError") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment