|
import heapq |
|
import itertools |
|
import operator |
|
import time |
|
|
|
import dateutil |
|
import trio |
|
|
|
import aioevent |
|
|
|
|
|
class BlockingAsyncIterable: |
|
def __aiter__(self): |
|
return self |
|
|
|
async def __anext__(self): |
|
await trio.sleep_forever() |
|
|
|
|
|
class ConstantValue: |
|
def __init__(self, value): |
|
self._value = value |
|
self.values = BlockingAsyncIterable() |
|
|
|
@property |
|
def value(self): |
|
return self._value |
|
|
|
|
|
class ReadOnlyDynamicValue: |
|
def __init__(self): |
|
self.values = aioevent.EventQueue() |
|
|
|
async def notify_value_changed(self): |
|
await self.values.send(self.value) |
|
|
|
|
|
def get_values_attribute(obj): |
|
try: |
|
return obj.values |
|
except AttributeError: |
|
return obj |
|
|
|
|
|
class DynamicValue(ReadOnlyDynamicValue): |
|
def __init__(self, nursery): |
|
super().__init__() |
|
self.__nursery = nursery |
|
self.__value_copy_nursery = None |
|
self.__source = None |
|
|
|
def set_initial_value(self, initial_value): |
|
self.__value = initial_value |
|
|
|
async def set_value(self, new_value): |
|
self.__value = new_value |
|
await self.notify_value_changed() |
|
|
|
def get_value(self): |
|
return self.__value |
|
|
|
@property |
|
def value(self): |
|
return self.get_value() |
|
|
|
@property |
|
def source(self): |
|
return self.__source |
|
|
|
@source.setter |
|
def source(self, values): |
|
if self.__value_copy_nursery: |
|
self.__value_copy_nursery.cancel_scope.cancel() |
|
self.__value_copy_nursery = None |
|
self.__source = get_values_attribute(values) |
|
if self.__source is None: |
|
return |
|
source_values_aiter = self.__source.__aiter__() |
|
try: |
|
initial_source_value = values.value |
|
except AttributeError: |
|
initial_source_value = None |
|
self.__nursery.start_soon( |
|
self.__value_copy, |
|
self.__source, |
|
source_values_aiter, |
|
initial_source_value, |
|
) |
|
|
|
async def __value_copy(self, values, values_aiter, initial_source_value): |
|
if values is not self.__source: |
|
return |
|
if initial_source_value is not None: |
|
await self.set_value(initial_source_value) |
|
async with trio.open_nursery() as nursery: |
|
self.__value_copy_nursery = nursery |
|
async for new_value in values_aiter: |
|
await self.set_value(new_value) |
|
|
|
|
|
async def as_any_values_change(nursery, valuess): |
|
# It is necessary for a nursery to be provided by the caller here, rather |
|
# than us creating one ourselves, due to this being an async generator and |
|
# yielding from inside a nursery block isn't permitted by trio: |
|
# https://github.com/python-trio/trio/issues/264 |
|
sender, receiver = trio.open_memory_channel(0) |
|
async def send_values_to_queue(values, task_status=trio.TASK_STATUS_IGNORED): |
|
task_status.started() |
|
async for value in get_values_attribute(values): |
|
await sender.send(value) |
|
for values in valuess: |
|
await nursery.start(send_values_to_queue, values) |
|
# all readers are started and blocking at the async for now |
|
async for value in receiver: |
|
yield value |
|
|
|
|
|
class DerivedValue(ReadOnlyDynamicValue): |
|
def __init__(self, nursery, valuess=None): |
|
super().__init__() |
|
readers = [get_values_attribute(values).__aiter__() for values in valuess] |
|
changes = as_any_values_change(nursery, readers) |
|
nursery.start_soon(self.__follow_value_changes, changes) |
|
self.value = self.calculate() |
|
|
|
async def __follow_value_changes(self, changes): |
|
async for change in changes: |
|
old_value = self.value |
|
new_value = self.calculate() |
|
if new_value != old_value: |
|
self.value = new_value |
|
await self.notify_value_changed() |
|
|
|
|
|
class MaxValue(DerivedValue): |
|
def __init__(self, nursery, sources): |
|
self.sources = sources |
|
super().__init__(nursery, sources) |
|
|
|
def calculate(self): |
|
return max(v.value for v in self.sources) |
|
|
|
|
|
class ValueRatio(DerivedValue): |
|
def __init__(self, nursery, a, b): |
|
self.a = a |
|
self.b = b |
|
super().__init__(nursery, [self.a, self.b]) |
|
|
|
def calculate(self): |
|
return self.a.value / self.b.value |
|
|
|
|
|
class OrValue(DerivedValue): |
|
def __init__(self, nursery, a, b): |
|
self.a = a |
|
self.b = b |
|
super().__init__(nursery, [self.a, self.b]) |
|
|
|
def calculate(self): |
|
return self.a.value or self.b.value |
|
|
|
|
|
class ScheduledValues(ReadOnlyDynamicValue): |
|
def __init__(self, nursery, sequence, initial_value=False, time_func=time.time, sleep_func=trio.sleep): |
|
super().__init__() |
|
self._value = initial_value |
|
self._iterator = iter(sequence) |
|
self._time_func = time_func |
|
self._sleep_func = sleep_func |
|
nursery.start_soon(self._follow_schedule) |
|
|
|
@staticmethod |
|
def datetime_to_timestamp(datetime, tzinfo=None): |
|
# If tzinfo is supplied then it will be used and datetime must be |
|
# naive. If not supplied then the local time zone is used if the |
|
# datetime is naive, otherwise the datetime's own time zone is used. |
|
if datetime.tzinfo is None: |
|
tzinfo_to_use = tzinfo or dateutil.tz.tzlocal() |
|
return datetime.replace(tzinfo=tzinfo_to_use).timestamp() |
|
else: |
|
assert tzinfo is None |
|
return datetime.timestamp() |
|
|
|
@classmethod |
|
def sequence_from_rrule(cls, rrule, value, tzinfo=None): |
|
return ( |
|
(cls.datetime_to_timestamp(datetime, tzinfo), value) |
|
for datetime in rrule |
|
) |
|
|
|
@classmethod |
|
def sequence_from_rrule_value_pairs(cls, rrule_value_pairs, tzinfo=None): |
|
sequences = [ |
|
( |
|
(cls.datetime_to_timestamp(datetime, tzinfo), v) |
|
for datetime, v in zip(rrule, itertools.repeat(value)) |
|
) |
|
for rrule, value in rrule_value_pairs |
|
] |
|
return heapq.merge(*sequences, key=operator.itemgetter(0)) |
|
|
|
@classmethod |
|
def from_rrule(cls, nursery, rrule, value, **kwargs): |
|
sequence = cls.sequence_from_rrule(rrule, value) |
|
return cls(nursery, sequence, **kwargs) |
|
|
|
@classmethod |
|
def from_rrule_value_pairs(cls, nursery, rrule_value_pairs, **kwargs): |
|
sequence = cls.sequence_from_rrule_value_pairs(rrule_value_pairs) |
|
return cls(nursery, sequence, **kwargs) |
|
|
|
@property |
|
def value(self): |
|
return self._value |
|
|
|
async def _follow_schedule(self): |
|
while True: |
|
now = self._time_func() |
|
next_timestamp, next_value = next(self._iterator) |
|
if next_timestamp < now: |
|
# skip through to an event that is in the future and instead of |
|
# flapping our value set it to the most recent event that took |
|
# place in the past |
|
while next_timestamp < now: |
|
final_past_value = next_value |
|
next_timestamp, next_value = next(self._iterator) |
|
self._value = final_past_value |
|
await self.notify_value_changed() |
|
delay = next_timestamp - now |
|
await self._sleep_func(delay) |
|
self._value = next_value |
|
await self.notify_value_changed() |