Last active
September 10, 2024 11:38
-
-
Save SF-300/aafe129fcf766ccdea4ca20cfee5cfba to your computer and use it in GitHub Desktop.
Simple signal pattern implementation with asyncio support.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import weakref | |
import inspect | |
from asyncio import Future | |
from typing import Any, Callable, TypeVar, Protocol, Awaitable, Generator | |
__all__ = "Listenable", "Signal", | |
_T = TypeVar("_T", covariant=True) | |
class Listenable(Awaitable[_T], Protocol[_T]): | |
def on(self, slot: Callable[[_T], Any], weak: bool = False) -> None: | |
... | |
def off(self, slot: Callable[[_T], Any]) -> None: | |
... | |
class Signal(Listenable[_T]): | |
def __init__(self): | |
self._slots = dict() | |
self._f: None | Future[_T] = None | |
def on(self, slot: Callable[[_T], Any], weak: bool = False) -> None: | |
slot_id = id(slot) | |
if slot_id in self._slots: | |
return | |
if weak: | |
def cleanup(_): | |
self._slots.pop(slot_id, None) | |
slot = weakref.proxy(slot, cleanup) | |
self._slots[slot_id] = slot | |
def off(self, slot: Callable[[_T], Any]) -> None: | |
slot_id = id(slot) | |
try: | |
del self._slots[slot_id] | |
except KeyError: | |
pass | |
async def __call__(self, value: _T) -> _T: | |
errors = [] | |
for slot in tuple(self._slots.values()): | |
try: | |
result = slot(value) | |
if not inspect.isawaitable(result): | |
continue | |
await result | |
except Exception as e: | |
errors.append(e) | |
if errors: | |
raise ExceptionGroup("Some slots raised exceptions while signal was being fired!", errors) | |
return value | |
def __await__(self) -> Generator[Any, Any, _T]: | |
if self._f is None: | |
f = Future() | |
def slot(value): | |
self._f = None | |
f.set_result(value) | |
self.off(slot) | |
self.on(slot) | |
assert isinstance(self._f, Future) | |
return (yield from self._f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment