Last active
December 25, 2015 21:09
-
-
Save dplepage/7040646 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bisect | |
from collections import namedtuple | |
import datetime as dt | |
Event = namedtuple("Event", ('timestamp', 'value')) | |
class Timeline(object): | |
def __init__(self, initial=None, events=(), clock=None): | |
if clock is None: | |
clock = dt.datetime | |
self.initial = initial | |
self.events = map(Event._make, events) | |
self.clock = clock | |
def set_latest(self, value): | |
if self.events: | |
self.events[-1] = self.events[-1].replace(value=value) | |
else: | |
self.initial = value | |
def insert(self, value, timestamp=None): | |
if timestamp is None: | |
timestamp = self.clock.now() | |
bisect.insort(self.events, Event(timestamp, value)) | |
def val_at_time(self, timestamp=None): | |
if timestamp is None: | |
timestamp = self.clock.now() | |
if not self.events or timestamp < self.events[0].timestamp: | |
return self.initial | |
x = bisect.bisect(self.events, Event(timestamp, None)) | |
if x < len(self.events) and self.events[x].timestamp == timestamp: | |
return self.events[x].value | |
return self.events[x-1].value | |
def current(self): | |
return self.val_at_time(None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime as dt | |
from dfiance import Dictifier, Field, Invalid, ErrorAggregator, DateTime | |
from dfiance import NamedTuple, List | |
from timeline import Timeline, Event | |
class TimelineDfier(Dictifier): | |
def __init__(self, subtype, timetype=None): | |
self.val_dfier = Field.asfield(subtype) | |
if timetype: | |
self.time_dfier = Field.asfield(timetype, not_none=True) | |
else: | |
self.time_dfier = Field(DateTime(), not_none=True) | |
self.event_dfier = NamedTuple(Event(self.time_dfier, self.val_dfier)) | |
self.list_dfier = List(elt_type=self.event_dfier) | |
def dictify(self, value, **kwargs): | |
return dict( | |
initial = self.val_dfier.dictify(value.initial, **kwargs), | |
events = self.list_dfier.dictify(value.events, **kwargs), | |
) | |
def undictify(self, value, **kwargs): | |
if not isinstance(value, dict): | |
raise Invalid('not_dict') | |
error_agg = ErrorAggregator(autoraise = kwargs.get('fail_early', False)) | |
with error_agg.checking_sub('initial'): | |
initial = self.val_dfier.undictify(value.get('initial'), **kwargs) | |
with error_agg.checking_sub('events'): | |
events = self.list_dfier.undictify(value.get('events'), **kwargs) | |
clock = kwargs.get('timeline_clock', dt.datetime) | |
return Timeline(initial, events, clock) | |
def validate(self, value, **kwargs): | |
if not isinstance(value, TimelineDfier): | |
raise Invalid('type_error') | |
self.val_dfier.validate(value.initial) | |
self.list_dfier.validate(value.events) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime as dt | |
from dfiance import SchemaObj, Int, String, DateTime | |
from timeline import Timeline | |
from timeline_dfier import TimelineDfier | |
def _format(t): | |
yield "Initial: {}".format(t.initial) | |
for ts, val in t.events: | |
yield '{}: {}'.format(ts, val) | |
def format(t): | |
return '\n'.join(_format(t)) | |
class Point(SchemaObj): | |
field_types = dict(x=Int(), y=Int(), label=String()) | |
def __str__(self): | |
return "{}@{},{}".format(self.label, self.x, self.y) | |
t = Timeline(Point(x=1, y=1, label="WP")) | |
t.insert(Point(x=1, y=2, label="WP")) | |
then = dt.datetime.now() | |
t.insert(Point(x=2, y=4, label="WQ")) | |
t.insert(Point(x=1, y=3, label="WP"), timestamp=then) | |
assert t.current() == Point(x=2, y=4, label="WQ"), t.current() | |
assert t.val_at_time(then) == Point(x=1, y=3, label="WP") | |
dfier = TimelineDfier(Point.dfier()) | |
d = dfier.dictify(t) | |
times = [DateTime().dictify(e.timestamp) for e in t.events] | |
assert d == { | |
'initial': {'y': 1, 'x': 1, 'label': 'WP'}, | |
'events': [ | |
(times[0], {'y': 2, 'x': 1, 'label': 'WP'}), | |
(times[1], {'y': 3, 'x': 1, 'label': 'WP'}), | |
(times[2], {'y': 4, 'x': 2, 'label': 'WQ'}) | |
] | |
} | |
t2 = dfier.undictify(d) | |
assert format(t) == format(t2) | |
# Example with a custom clock that expresses time as an integer | |
class TurnClock(object): | |
def __init__(self, turn=0): | |
self.turn = turn | |
def now(self): | |
return self.turn | |
def advance(self, t=1): | |
self.turn += t | |
def set(self, t): | |
self.turn = t | |
clock = TurnClock(0) | |
t = Timeline(Point(x=1, y=1, label="WP"), clock=clock) | |
clock.advance() | |
t.insert(Point(x=1, y=2, label="WP")) | |
clock.advance() | |
then = clock.now() | |
clock.advance() | |
t.insert(Point(x=2, y=4, label="WQ")) | |
t.insert(Point(x=1, y=3, label="WP"), timestamp=then) | |
assert t.current() == Point(x=2, y=4, label="WQ"), t.current() | |
assert t.val_at_time(then) == Point(x=1, y=3, label="WP") | |
dfier = TimelineDfier(Point.dfier(), Int()) | |
d = dfier.dictify(t) | |
assert d == { | |
'initial': {'y': 1, 'x': 1, 'label': 'WP'}, | |
'events': [ | |
(1, {'y': 2, 'x': 1, 'label': 'WP'}), | |
(2, {'y': 3, 'x': 1, 'label': 'WP'}), | |
(3, {'y': 4, 'x': 2, 'label': 'WQ'}) | |
] | |
} | |
t2 = dfier.undictify(d) | |
assert format(t) == format(t2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment