Created
October 10, 2017 22:07
-
-
Save zrbecker/fb9a27914344d1797cd9f5acdba169ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Utilities for managing time at different resolutons''' | |
from collections import defaultdict, deque, namedtuple | |
from datetime import datetime, timedelta | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
logging.disable(logging.CRITICAL) # Comment out for debug messages | |
def get_bucket_time(start_time, cadence, time): | |
'''Gets the timestamp of the bucket which `time` belongs to. | |
Example: Get the timestamp for the hour or day | |
>>> start_time = datetime(2017, 1, 1) | |
>>> time = datetime(2017, 1, 1, 11, 30, 15) | |
>>> get_bucket_time(start_time, timedelta(hours=1), time) # get hour | |
datetime.datetime(2017, 1, 1, 11, 0) | |
>>> get_bucket_time(start_time, timedelta(days=1), time) # get day | |
datetime.datetime(2017, 1, 1, 0, 0) | |
''' | |
return start_time + int((time - start_time) / cadence) * cadence | |
Bucket = namedtuple('Bucket', ['time', 'data']) | |
class TimeBuckets: | |
'''A class for partitioning a period of time into a set of buckets.''' | |
def __init__(self, | |
last_bucket_time=datetime.now(), | |
cadence=timedelta(seconds=1), | |
num_buckets=300, | |
data_factory=lambda: defaultdict(int)): | |
logging.debug('TimeBucket constructor') | |
logging.debug(f'constructor> last_bucket_time={last_bucket_time}') | |
logging.debug(f'constructor> cadence={cadence}') | |
logging.debug(f'constructor> num_buckets={num_buckets}') | |
logging.debug(f'constructor> data_factory={data_factory}') | |
self._first_bucket_time = last_bucket_time - (num_buckets - 1) * cadence | |
self._last_bucket_time = last_bucket_time | |
self._cadence = cadence | |
self._num_buckets = num_buckets | |
self._data_factory = data_factory | |
self._bucket_finder = {} | |
self._buckets = deque() | |
self.update_last_bucket_time(self._last_bucket_time) | |
def get_bucket(self, time): | |
'''Get the bucket for the current time. Raise KeyError if there is no | |
bucket for that time. | |
Example: | |
Create a set of hourly buckets for 2015 January 1st. | |
>>> buckets = TimeBuckets(last_bucket_time=datetime(2015, 1, 1, 23, 0), | |
... cadence=timedelta(hours=1), | |
... num_buckets=24) | |
Get bucket for 12:30:15pm, i.e., the 12:00pm bucket. | |
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 12, 30, 15)) | |
>>> bucket.data['count'] | |
0 | |
Increment count for 12:00pm bucket. | |
>>> bucket.data['count'] += 1 | |
>>> bucket.data['count'] | |
1 | |
Get bucket for 12:45pm, i.e., the 12:00pm bucket. | |
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 12, 45)) | |
>>> bucket.data['count'] | |
1 | |
Get bucket for 1:13:45pm, i.e., the 1:00pm bucket. | |
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 13, 45)) | |
>>> bucket.data['count'] | |
0 | |
Get bucket for 12:00pm next day. | |
>>> buckets.get_bucket(datetime(2015, 1, 2, 12)) | |
Traceback (most recent call last): | |
... | |
KeyError: datetime.datetime(2015, 1, 2, 12, 0) | |
''' | |
logging.debug(f'get_bucket(time={time})') | |
bucket_time = get_bucket_time(self._first_bucket_time, | |
self._cadence, | |
time) | |
logging.debug(f'get_bucket> first_time={self._first_bucket_time}') | |
logging.debug(f'get_bucket> last_time={self._last_bucket_time}') | |
logging.debug(f'get_bucket> bucket_time={bucket_time}') | |
logging.debug(f'get_bucket> num_buckets={len(self._bucket_finder)}') | |
return self._bucket_finder[bucket_time] | |
def update_last_bucket_time(self, last_bucket_time): | |
'''Updates the set of buckets so the period ends at last_bucket_time. | |
last_bucket_time will be truncated to fit the cadence of the current set | |
of buckets, e.g. if the current buckets are hourly on the hour, then | |
12:15pm is truncated to 12:00pm, and 3:45am is truncated to 3:00am. | |
Example: | |
Create a set of hourly buckets for 2015 January 1st. | |
>>> buckets = TimeBuckets(last_bucket_time=datetime(2015, 1, 1, 23), | |
... cadence=timedelta(hours=1), | |
... num_buckets=24) | |
Get first bucket of time period, i.e. 12:00am on January 1st. | |
>>> jan1st_12am = buckets.get_bucket(datetime(2015, 1, 1, 0)) | |
>>> jan1st_12am.time | |
datetime.datetime(2015, 1, 1, 0, 0) | |
Set some data on the 1:00am bucket for January 1st. | |
>>> jan1st_1am = buckets.get_bucket(datetime(2015, 1, 1, 1)) | |
>>> jan1st_1am.time | |
datetime.datetime(2015, 1, 1, 1, 0) | |
>>> jan1st_1am.data['count'] = 100 | |
We don't have a 12:00am on January 2nd bucket. | |
>>> jan2nd_12am = buckets.get_bucket(datetime(2015, 1, 2, 0)) | |
Traceback (most recent call last): | |
... | |
KeyError: datetime.datetime(2015, 1, 2, 0, 0) | |
Move the time period up by one bucket. | |
>>> buckets.update_last_bucket_time(datetime(2015, 1, 2, 0)) | |
Now the 12:00am on January 1st bucket no longer exists. | |
>>> jan1st_12am = buckets.get_bucket(datetime(2015, 1, 1, 0)) | |
Traceback (most recent call last): | |
... | |
KeyError: datetime.datetime(2015, 1, 1, 0, 0) | |
We still have our data for 1:00am on January 1st. | |
>>> jan1st_1am = buckets.get_bucket(datetime(2015, 1, 1, 1)) | |
>>> jan1st_1am.time | |
datetime.datetime(2015, 1, 1, 1, 0) | |
>>> jan1st_1am.data['count'] | |
100 | |
The 12:00am on January 2nd bucket now exists. | |
>>> jan2nd_12am = buckets.get_bucket(datetime(2015, 1, 2, 0)) | |
>>> jan2nd_12am.time | |
datetime.datetime(2015, 1, 2, 0, 0) | |
''' | |
logging.debug(f'update_time(last_bucket_time={last_bucket_time}') | |
self._last_bucket_time = get_bucket_time(self._first_bucket_time, | |
self._cadence, | |
last_bucket_time) | |
self._first_bucket_time = self._last_bucket_time \ | |
- (self._num_buckets - 1) * self._cadence | |
logging.debug(f'update_time> first_time={self._first_bucket_time}') | |
logging.debug(f'update_time> last_time={self._last_bucket_time}') | |
self._filter_buckets() | |
self._fill_buckets() | |
def _filter_buckets(self): | |
logging.debug('filter_buckets> removing buckets that are too old.') | |
while self._buckets \ | |
and self._buckets[0].time < self._first_bucket_time: | |
del self._bucket_finder[self._buckets[0].time] | |
self._buckets.popleft() | |
logging.debug('filter_buckets> removing buckets that are too new.') | |
while self._buckets \ | |
and self._buckets[-1].time >= self._last_bucket_time: | |
del self._bucket_finder[self._buckets[-1].time] | |
self._buckets.pop() | |
logging.debug(f'queue_length={len(self._buckets)}') | |
logging.debug(f'finder_length={len(self._bucket_finder)}') | |
logging.debug('filter_buckets> done filtering buckets.') | |
def _fill_buckets(self): | |
if not self._buckets: | |
logging.debug('fill_buckets> buckets are empty, adding bucket') | |
bucket = Bucket(self._first_bucket_time, self._data_factory()) | |
self._bucket_finder[self._first_bucket_time] = bucket | |
self._buckets.appendleft(bucket) | |
logging.debug('fill_buckets> filling older buckets') | |
while self._buckets[0].time > self._first_bucket_time: | |
time = self._buckets[0].time - self._cadence | |
bucket = Bucket(time, self._data_factory()) | |
self._bucket_finder[time] = bucket | |
self._buckets.appendleft(bucket) | |
logging.debug('fill_buckets> filling newer buckets') | |
while self._buckets[-1].time < self._last_bucket_time: | |
time = self._buckets[-1].time + self._cadence | |
bucket = Bucket(time, self._data_factory()) | |
self._bucket_finder[time] = bucket | |
self._buckets.append(bucket) | |
logging.debug(f'queue_length={len(self._buckets)}') | |
logging.debug(f'finder_length={len(self._bucket_finder)}') | |
logging.debug('fill_buckets> done filling buckets.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment