Skip to content

Instantly share code, notes, and snippets.

@eldonwilliams
Created February 15, 2022 20:12
Show Gist options
  • Save eldonwilliams/dbb6f5d90a67087f2d08c052fb36463c to your computer and use it in GitHub Desktop.
Save eldonwilliams/dbb6f5d90a67087f2d08c052fb36463c to your computer and use it in GitHub Desktop.
# Here's an example of how to use it
import TimeProvider
def __main__():
heartbeatProvider = TimeProvider.TimeProvider(1)
frameProvider = TimeProvider.TimeProvider(60)
@heartbeatProvider.connect_to_tick
def _(delta, tick):
print("Heartbeat!")
@frameProvider.connect_to_tick
def _(delta, tick):
print("Heartbeat tick # {}".format(heartbeatProvider.tick))
heartbeatProvider.start()
frameProvider.start()
if __name__ == "__main__":
__main__()
# GIST NOTE
"""
TimeProvider is a basic utility which will give a *reliable* signal X times a second.
It isn't considered to be 100% reliable and the signal connections are provided the deltatime as well as the tick number.
The timer also takes account of the time it takes to execute any connections
Possible efficency increases:
- Wrap each signal connection in a thread and wait for them all to resolve allowing multiple to run at once and keeping one for yielding (timeouts could be imposed)
- Fix the deltatime for the first two frames, currently it initalizes to 0 (which is normal I think) and the second frame is wonky, afterward I get expected behaviour
- Is it compatiable with different OS's?
Also, keep in mind that this is the first module I've made in possibly >2 years in Python, I've lost a lot of experience in Python.
"""
import threading
import timeit
# Provides methods to invoke a function every X seconds.
class TimeProvider(object):
def __init__(self, tickrate):
self.tickrate = tickrate
self.tick = 0
self.current_delta = 0
self.active = False
self.thread = None
self.connections = []
# Handles the thread when initalized
def handle_thread(self):
last_frame = timeit.default_timer()
def tick():
nonlocal last_frame
self.tick += 1
start = timeit.default_timer()
for connection in self.connections:
if callable(connection):
connection(self.current_delta, self.tick)
runtime = timeit.default_timer() - start
self.current_delta = timeit.default_timer() - last_frame
last_frame = timeit.default_timer()
def _(): pass
timer = threading.Timer((1 / self.tickrate) - runtime, function=_)
timer.start()
timer.join()
while self.active:
tick()
# Starts the timeprovider
def start(self, callback=None):
if self.active == True:
callback()
return
# Reset these values
self.tick = 0
self.current_delta = 0
self.thread = None
self.active = True
# Start thread
self.thread = threading.Thread(target=self.handle_thread)
self.thread.start()
if callable(callback):
callback()
def stop(self, callback=None):
if self.active == False:
callback()
return
self.active = False
self.thread = None
self.tick = 0
self.current_delta = 0
if callable(callback):
callback()
# Used as a decorator to attach a function to the thread ticker
def connect_to_tick(self, function):
self.connections.append(function)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment