Skip to content

Instantly share code, notes, and snippets.

@jck
Created July 13, 2015 19:01
Show Gist options
  • Save jck/0d3549435a8e60e11995 to your computer and use it in GitHub Desktop.
Save jck/0d3549435a8e60e11995 to your computer and use it in GitHub Desktop.
Experiment to implement MyHDL style simulations using python 3.5 and asyncio
import asyncio
import heapq
class StopSim(BaseException):
"""Raised to stop the simulation"""
class SimEventLoop(asyncio.AbstractEventLoop):
def __init__(self):
self._time = 0
self._scheduled = []
self._ready = []
self._debug = False
def get_debug(self):
return self._debug
def create_task(self, coro):
task = asyncio.Task(coro, loop=self)
return task
def time(self):
return self._time
def call_soon(self, callback, *args):
# print('call soon', callback)
handle = asyncio.Handle(callback, args, self)
handle._run()
def call_later(self, delay, callback, *args):
# print('call_later', delay, callback)
timer = self.call_at(self.time() + delay, callback, *args)
return timer
def call_at(self, when, callback, *args):
# print('call_at', when, callback)
timer = asyncio.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
def _timer_handle_cancelled(self, handle):
# print('hc')
pass
def _run_once(self):
# print(self.time(), self._scheduled)
while self._scheduled:
handle = self._scheduled[0]
if handle._when > self._time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
handle._run()
self._time += 1
def run_forever(self):
while True:
try:
self._run_once()
except StopSim:
break
def run_until_complete(self, future):
new_task = not isinstance(future, asyncio.Future)
future = asyncio.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
def lol(fut):
raise StopSim
# print('done')
future.add_done_callback(lol)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
print('canc')
future.exception()
raise
future.remove_done_callback(lol)
return future.result()
def delay(time, result=None, *, loop=None):
async def _delay():
"""Coroutine that completes after a given time (in seconds)."""
# based on asyncio.sleep
future = asyncio.Future(loop=loop)
h = future._loop.call_later(time,
future._set_result_unless_cancelled, result)
try:
return (await future)
finally:
h.cancel()
return _delay
def now(loop=None):
if loop is None:
loop = asyncio.get_event_loop()
return loop.time()
def always(cond):
def decorator(func):
async def wrapper():
while True:
await cond()
func()
return wrapper
return decorator
@always(delay(3))
def hi():
t = asyncio.get_event_loop().time()
print(now(), 'hi')
def main():
loop = SimEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(hi())
if __name__ == '__main__':
main()
@sheridp
Copy link

sheridp commented May 20, 2017

Hi jck, I recently had the same idea to attempt our TLM with asyncio, mostly to gain access to the existing features like locks and semaphores. Did this experiment work? Any chance that myhdl might move to asyncio?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment