Skip to content

Instantly share code, notes, and snippets.

@sstur
Created July 15, 2024 22:35
Show Gist options
  • Save sstur/5208d6709c1597b0700739d4daa06e59 to your computer and use it in GitHub Desktop.
Save sstur/5208d6709c1597b0700739d4daa06e59 to your computer and use it in GitHub Desktop.
from asyncio import Queue
from typing import Callable, TypeVar, Generic, AsyncIterable
T = TypeVar("T")
class EventEmitter(Generic[T]):
listeners: set[Callable[[T], None]] = set()
def add_listener(self, listener: Callable[[T], None]):
self.listeners.add(listener)
def remove_listener(self, listener: Callable[[T], None]):
self.listeners.remove(listener)
def emit(self, data: T):
for listener in self.listeners:
try:
listener(data)
except:
pass
async def to_async_iterable(emitter: EventEmitter[T]) -> AsyncIterable[T]:
queue: Queue[T] = Queue()
def listener(data: T):
queue.put_nowait(data)
emitter.add_listener(listener)
try:
while True:
yield await queue.get()
finally:
emitter.remove_listener(listener)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment