Skip to content

Instantly share code, notes, and snippets.

@oleiade
Created November 28, 2013 10:55
Show Gist options
  • Save oleiade/7690179 to your computer and use it in GitHub Desktop.
Save oleiade/7690179 to your computer and use it in GitHub Desktop.
A ticker python implementation using a multiprocessing.Process
import time
import multiprocessing
from datetime import datetime, timedelta
class Ticker(multiprocessing.Process):
"""
:param wake_interval: in seconds
:type wake_interval: float
:param default_interval: in seconds
:type default_interval: int
"""
def __init__(self, wake_interval=0.01, default_interval=100, *args, **kwargs):
self._manager = multiprocessing.Manager()
self._running = self._manager.Value(bool, False)
self._scheduled_ticks = multiprocessing.Queue()
self._descheduled_ticks = self._manager.list()
self.wake_interval = wake_interval
self.default_interval = default_interval
self.ticked = self._manager.list()
super(Ticker, self).__init__(*args, **kwargs)
def schedule(self, identifier, interval=None):
"""Schedules a tick at a provided interval
:param identifier: Unique identifier of the tick to
be set
:type identifier: str
:param interval: Interval for the tick to happen (seconds)
:type interval: int
"""
interval = interval or self.default_interval
heartbeat_at = datetime.now() + timedelta(seconds=interval)
schedule = (identifier, heartbeat_at, interval)
self._scheduled_ticks.put(schedule)
def deschedule(self, identifier):
"""Marks a tick event as descheduled
:param identifier: Unique identifier of the tick to
be set
:type identifier: str
"""
self._descheduled_ticks.append(identifier)
def run(self):
next_tick = None
self._running.value = True
while self._running.value is True:
now = datetime.now()
if next_tick is None:
# Pop the first tick event from the queue. And
# store it as the next_tick event.
if self._scheduled_ticks.empty() is False:
next_tick = self._scheduled_ticks.get_nowait()
else:
# If next tick is marked as descheduled,
# ignore it (set it to None) and pop it from
# the descheduled ticks list.
if next_tick[0] in self._descheduled_ticks:
self._descheduled_ticks.remove(next_tick[0])
next_tick = None
if next_tick and now >= next_tick[1]:
# If tick event is not already present
# in ticked events list, then add it
if next_tick[0] not in self.ticked:
self.ticked.append(next_tick[0])
# Reschedule the tick event at interval in the future,
# and set next_tick to None for the ticks queue to be popped
new_heartbeat_at = now + timedelta(seconds=next_tick[2])
self._scheduled_ticks.put((next_tick[0], new_heartbeat_at, next_tick[2]))
next_tick = None
time.sleep(self.wake_interval)
def stop(self):
self._running.value = False
self.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment