Last active
July 8, 2023 12:31
-
-
Save alexshpilkin/1779e08ff62622cd92fc68b20adf870b to your computer and use it in GitHub Desktop.
Deprecating the Observer pattern in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Awaitable | |
from heapq import heappop, heappush | |
from math import inf | |
from weakref import WeakSet | |
try: | |
from functools import singledispatch | |
except ImportError: | |
from pkgutil import simplegeneric as singledispatch | |
def iterawaitable(coro): | |
if not isinstance(coro, Awaitable): | |
async def makeawaitable(): | |
return await coro | |
coro = makeawaitable() | |
return coro.__await__() | |
@singledispatch | |
def needs(startee, started, rank): | |
return startee.needs(started, rank) | |
@singledispatch | |
def wakes(what, whom=None, when=None): | |
assert (whom is None) == (when is None) | |
return what.wakes(whom, when) | |
def update(seed): | |
stack, queue = [], [(*wakes(seed), seed)] | |
while stack or queue: | |
if stack: | |
job = val = stack.pop() | |
_, run = wakes(job) | |
else: | |
now, run, job = heappop(queue) | |
val = None | |
while True: | |
try: | |
need = run.send(val) | |
except StopIteration as e: | |
start, wake = e.value | |
stack.extend(reversed(wake)) | |
for new in start: | |
heappush(queue, (*wakes(new), new)) | |
break | |
else: | |
val = job | |
then, coro = wakes(need, job, now) | |
if coro is None: | |
continue | |
if then is not None: | |
assert then > now or then[0] == now[0] == inf #fixme | |
heappush(queue, (then, coro, need)) | |
break | |
class Job(object): | |
def __init__(self): | |
self._starts = WeakSet() | |
def __await__(self): | |
startee = yield self | |
if startee not in self._starts: | |
self._starts.add(startee) | |
def started(): | |
self._starts.remove(startee) | |
needs(startee, started, self.rank + 1) | |
return self.value | |
@property | |
def value(self): | |
return None | |
def start(self): | |
if False: yield # make this a generator function | |
return tuple(self._starts), () | |
def wakes(self, whom=None, when=None): | |
then = (self.rank, id(self)) | |
if when is not None and then[0] != inf and then <= when: #fixme | |
return None, None | |
return then, self.start() | |
class primary(Job): | |
def __init__(self, value): | |
super(primary, self).__init__() | |
self._value = value | |
@property | |
def value(self): | |
return self._value | |
@value.setter | |
def value(self, value): | |
self._value = value | |
update(self) | |
@property | |
def rank(self): | |
return 0 | |
class derived(Job): | |
def __init__(self, calc=None): | |
super(derived, self).__init__() | |
if calc is not None: | |
self._calc = calc | |
self._coro = None | |
self._started = [] | |
self.rank = inf | |
@property | |
def value(self): | |
try: | |
return self._value | |
except AttributeError: | |
pass | |
update(self) | |
return self._value | |
def start(self): | |
for started in self._started: | |
started() | |
self._started.clear() | |
self.rank = 0 | |
self._wakes = [] | |
return self.__body(super(derived, self).start()) | |
def __body(self, coro): | |
self._value = yield from iterawaitable(self._calc()) | |
starts, wakes_ = yield from coro | |
wakes = self._wakes | |
wakes.extend(wakes_) | |
del self._wakes | |
self._coro = None | |
return starts, wakes | |
def needs(self, started, rank): | |
assert self._coro is not None | |
self._started.append(started) | |
self.rank = max(self.rank, rank) | |
def wakes(self, whom=None, when=None): | |
if self._coro is not None: | |
if whom is not None: | |
self._wakes.append(whom) | |
return None, self._coro | |
then, coro = super(derived, self).wakes(whom, when) | |
if coro is not None: | |
if whom is not None: | |
self._wakes.append(whom) | |
self._coro = coro | |
return then, coro | |
### example ### | |
c = primary(True) | |
@derived | |
async def x(): | |
print('updating x') | |
return 42 if await c else await y | |
@derived | |
async def y(): | |
print('updating y') | |
return await x if await c else 57 | |
@derived | |
async def z(): | |
print('updating z') | |
return await y # from z | |
@derived | |
async def w(): | |
print('updating w') | |
return await y # from w | |
### why we may have then == when | |
p = primary(14) | |
f = primary(False) | |
@derived | |
async def d(): | |
print('updating d') | |
return await p | |
@derived | |
async def e(): | |
print('updating e') | |
if not await f: | |
return -1 | |
x = await d | |
y = await d | |
z = await d | |
return x + y + z | |
# d.value, e.value # => 14, -1 | |
# d.rank, e.rank # => 1, 1 | |
# f.value = True # BOOM |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment