Skip to content

Instantly share code, notes, and snippets.

@pbabics
Last active November 2, 2019 19:38
Show Gist options
  • Save pbabics/ef023652712c3d2dec2fc2747d253b29 to your computer and use it in GitHub Desktop.
Save pbabics/ef023652712c3d2dec2fc2747d253b29 to your computer and use it in GitHub Desktop.
Simple AggregatedTimeSeries storage which allows fast retrieval of aggregated value. Aggregating function is defined by two functions, binary function `op` and inverse function `inv`.
#include <vector>
#include <exception>
/**
* Exception thrown whenever AggregatedTimeSeries instance
* is trying to access First or Last element of storage
* while storage is empty
*/
struct EmptyTimeSeries: public std::exception {
const char* what() const throw () {
return "TimeSeries storage is empty";
}
};
/**
* Exception thrown whenever AggregatedTimeSeries instance
* is requested to aggregated value for timeframe which
* has higher initial time than last element in storage
*/
struct EmptyTimeRange: public std::exception {
const char* what() const throw () {
return "Requested Time range is empty";
}
};
typedef long long int64;
typedef unsigned long long uint64;
/**
* Class used for representing aggregated time series entry
*
* @tparam V represents type of value and aggregated value
* @tparam T represents type of time entry
*/
template <class V, class T>
struct AggregatedTimeSeriesItem {
AggregatedTimeSeriesItem(const T& _time, const V& _value, const V& _aggregate) :
time(_time), value(_value), aggregate(_aggregate)
{ }
T time;
V value;
V aggregate;
};
/**
* Class used for representing aggregated time series
*
* Entries may be inserted out for order, during insert they are properly inserted
* and aggregated value is calcualted.
* Aggregating function is defined in class constructor. Actually two functions are required
* One which aggregates two values (op), and other which is able to create inverse value (inv).
*
* Aggregation is done during insertion to allow fast aggregate retrieval.
* If element is inserted into inserted on first position, aggregated value is equal to element value
* and rest of storage is updated accordingly, adding new value to the rest of aggregates
*
* If element is inserted into middle of the storage, aggregated value up to point of insertion
* is used as `previous_aggregated_value`, newly inserted element than has aggregated value equal to
* `op(previous_aggregated_value, new_value)`. Rest of storage is again updated accordingly.
*
* When aggregate is requested value is calculated as
* `op(aggregate from last storage entry, inv(aggregate from index right before requested time frame))`
*
* @tparam V represents type of value and aggregated value
* @tparam T represents type of time entry
* @tparam TimeSeriesStorage represents type of storage used for time serie
* default type is std::vector
*/
template <
typename V,
typename T,
typename TimeSeriesStorage = std::vector<AggregatedTimeSeriesItem<V, T> >
>
class AggregatedTimeSeries {
TimeSeriesStorage _storage;
const V (*_op) (const V&, const V&);
const V (*_inv) (const V&);
public:
/**
* Aggragated Time Serie constructor.
*
* Takes two arguments, first one is function defining
* aggregation of two values, second is function
* which creates inverse value. These two functions are
* used to effectively calculate aggregate of values in
* different time ranges. See class description for detail
*
* @param op Binary function defining aggregation of two values, e.g `a + b`
* @param inv Unary function creating inverse value, e.g `- a`
*/
AggregatedTimeSeries(
const V (*op) (const V&, const V&),
const V (*inv) (const V&)
);
virtual ~AggregatedTimeSeries() { }
/**
* Returns aggregated value of all values stored in time serie storage
*
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie
*/
V getAggregate() const;
/**
* @param since Minimal time which should be incorporated in aggregated value
* @return aggregated value of values stored in time serie storage
* which have time index greater or equal to ``since`` parameter.
*
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie
* @throws EmptyTimeRange Exception is raised when method is called with ``since``
* higher than time of newest value
*/
V getAggregate(const T& since) const;
/**
* @return count of all values stored in time serie storage
*/
uint64 getCount() const;
/**
* @return count of values stored in time serie storage
* which have time index greater or equal to ``since`` parameter.
* @param since Minimal time which should be incorporated in aggregated value
*/
uint64 getCount(const T& since) const;
/**
* @return value of first (oldest) element stored in time serie storage
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie
*/
const V& getLastValue() const;
/**
* @return value of last (newest) element stored in time serie storage
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie
*/
const V& getFirstValue() const;
/**
* Inserts new element into time serie, updating aggregated values
* of stored entries if necessary
*
* @param value to be inserted into time serie stoarge
* @param time at which value should be inserted
*/
void insert(const V& value, const T& time);
/**
* Clears entire time serie storage
*/
void clear();
/**
* Removes entries older than ``until`` from time serie storage
*
* @param until all elements which time value lower than this will be removed
*/
void flush(const T& until);
private:
int64 _find_insertion_point(const T& time) const;
};
template <class V, class T, class TimeSeriesStorage>
AggregatedTimeSeries<V, T, TimeSeriesStorage>::AggregatedTimeSeries(
const V (*op) (const V&, const V&),
const V (*inv) (const V&)
) :
_op(op), _inv(inv)
{
}
template <class V, class T, class TimeSeriesStorage>
V AggregatedTimeSeries<V, T, TimeSeriesStorage>::getAggregate() const
{
if (!_storage.size())
throw EmptyTimeSeries();
return _op(
_op(_storage.back().aggregate, _inv(_storage.front().aggregate)),
_storage.front().value
);
}
template <class V, class T, class TimeSeriesStorage>
V AggregatedTimeSeries<V, T, TimeSeriesStorage>::getAggregate(const T& since) const
{
if (!_storage.size())
throw EmptyTimeSeries();
int64 index = _find_insertion_point(since);
if (index < 0) {
index = -index - 1;
if (index == _storage.size())
throw EmptyTimeRange();
else if (index == 0)
return getAggregate();
else
return _op(_storage.back().aggregate, _inv(_storage[index].aggregate));
}
else if (index == 0)
return getAggregate();
else {
return _op(_storage.back().aggregate, _inv(_storage[index - 1].aggregate));
}
}
template <class V, class T, class TimeSeriesStorage>
uint64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::getCount() const
{
return _storage.size();
}
template <class V, class T, class TimeSeriesStorage>
uint64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::getCount(const T& since) const
{
int64 index = _find_insertion_point(since);
if (index < 0)
index = -index - 1;
return _storage.size() - index;
}
template <class V, class T, class TimeSeriesStorage>
const V& AggregatedTimeSeries<V, T, TimeSeriesStorage>::getLastValue() const
{
if (!_storage.size())
throw EmptyTimeSeries();
return _storage.back().value;
}
template <class V, class T, class TimeSeriesStorage>
const V& AggregatedTimeSeries<V, T, TimeSeriesStorage>::getFirstValue() const
{
if (!_storage.size())
throw EmptyTimeSeries();
return _storage.front().value;
}
template <class V, class T, class TimeSeriesStorage>
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::insert(const V& value, const T& time)
{
int64 index = _find_insertion_point(time);
if (index < 0)
index = -index - 1;
V aggregate = value;
if (index > 0)
aggregate = _op(_storage[index - 1].aggregate, aggregate);
typename TimeSeriesStorage::iterator it = _storage.begin();
std::advance(it, index);
// Insert new element, with propper aggregated value
it = _storage.insert(
it,
AggregatedTimeSeriesItem<V, T>(
time,
value,
aggregate
)
);
// Update all elements in front of newly inserted one
for (
++it;
it != _storage.end();
++it
)
it->aggregate = aggregate = _op(aggregate, it->value);
}
template <class V, class T, class TimeSeriesStorage>
int64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::_find_insertion_point(const T& time) const
{
uint64 low = 0,
high = _storage.size() - 1;
// Optimization for value before first value and after last value in storage
if (not _storage.size())
return -1;
else if (_storage[high].time < time)
return -_storage.size() - 1;
else if (_storage[low].time > time)
return -1;
while (low < high)
{
int64 selection = (high + low) / 2;
const T& selected_time = _storage[selection].time;
if (selected_time > time)
high = selection - 1;
else if (selected_time < time)
low = selection + 1;
else
return selection;
}
return -low - 1;
}
template <class V, class T, class TimeSeriesStorage>
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::clear()
{
_storage.clear();
}
template <class V, class T, class TimeSeriesStorage>
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::flush(const T& until)
{
int64 index = _find_insertion_point(until);
if (index < 0)
index = -index - 1;
typename TimeSeriesStorage::iterator it = _storage.begin();
std::advance(it, index);
_storage.erase(_storage.begin(), it);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment