Skip to content

Instantly share code, notes, and snippets.

@SF-300
Last active September 10, 2024 11:38
Show Gist options
  • Save SF-300/aafe129fcf766ccdea4ca20cfee5cfba to your computer and use it in GitHub Desktop.
Save SF-300/aafe129fcf766ccdea4ca20cfee5cfba to your computer and use it in GitHub Desktop.
Simple signal pattern implementation with asyncio support.
import weakref
import inspect
from asyncio import Future
from typing import Any, Callable, TypeVar, Protocol, Awaitable, Generator
__all__ = "Listenable", "Signal",
_T = TypeVar("_T", covariant=True)
class Listenable(Awaitable[_T], Protocol[_T]):
def on(self, slot: Callable[[_T], Any], weak: bool = False) -> None:
...
def off(self, slot: Callable[[_T], Any]) -> None:
...
class Signal(Listenable[_T]):
def __init__(self):
self._slots = dict()
self._f: None | Future[_T] = None
def on(self, slot: Callable[[_T], Any], weak: bool = False) -> None:
slot_id = id(slot)
if slot_id in self._slots:
return
if weak:
def cleanup(_):
self._slots.pop(slot_id, None)
slot = weakref.proxy(slot, cleanup)
self._slots[slot_id] = slot
def off(self, slot: Callable[[_T], Any]) -> None:
slot_id = id(slot)
try:
del self._slots[slot_id]
except KeyError:
pass
async def __call__(self, value: _T) -> _T:
errors = []
for slot in tuple(self._slots.values()):
try:
result = slot(value)
if not inspect.isawaitable(result):
continue
await result
except Exception as e:
errors.append(e)
if errors:
raise ExceptionGroup("Some slots raised exceptions while signal was being fired!", errors)
return value
def __await__(self) -> Generator[Any, Any, _T]:
if self._f is None:
f = Future()
def slot(value):
self._f = None
f.set_result(value)
self.off(slot)
self.on(slot)
assert isinstance(self._f, Future)
return (yield from self._f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment