Last active
September 28, 2017 03:53
-
-
Save virantha/93f052ea6f1eef5ddf483487edcf9289 to your computer and use it in GitHub Desktop.
process_buffer2.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from curio import Queue, CancelledError | |
class Port: | |
def __init__(self): | |
self.chan = None | |
class InputPort(Port): | |
async def recv(self): | |
tok = await self.chan.recv() | |
return tok | |
class OutputPort(Port): | |
async def send(self, val): | |
await self.chan.send((val)) | |
def connect(a, b, name=''): | |
# Connect ports together by instantiating a channel | |
chan = Channel(name) | |
# Check to make sure the ports have not been connected previously to other channels! | |
assert not a.chan, f"Channel {a} has already been connected!" | |
assert not b.chan, f"Channel {b} has already been connected!" | |
# Check to make sure the two ports are of opposite type (input/output) | |
if isinstance(a, InputPort): | |
assert isinstance(b, OutputPort), f"Channel {a} and {b} are both input ports!" | |
# Store the ports this channel is connected to | |
# b ---chan---> a | |
chan.l = b | |
chan.r = a | |
else: | |
assert isinstance(b, InputPort), f"Channel {a} and {b} are both output ports!" | |
# Store the ports this channel is connected to | |
# a ---chan---> b | |
chan.l = a | |
chan.r = b | |
# Now assign the channel to the two ports | |
a.chan = chan | |
b.chan = chan | |
class Process: | |
next_id = 0 | |
def __init__(self, name): | |
self.name = name | |
self.id = Process.next_id | |
Process.next_id += 1 | |
def __str__(self): | |
return f"{self.name}.{self.id}" | |
def __repr__(self): | |
return f"{type(self).__name__}('{self.name}')" | |
def message(self, m): | |
print(f"{self}: {m}") | |
class Source(Process): | |
def __init__(self, name, length, srcval): | |
super().__init__(name) | |
self.val = srcval | |
self.length = length | |
self.R = OutputPort() | |
async def exec(self): | |
for i in range(self.length): | |
self.message(f"sending {self.val}") | |
await self.R.send(self.val) | |
self.message(f"sent {self.val}") | |
self.message("terminated") | |
class Sink(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
self.L = InputPort() | |
async def exec(self): | |
tok_count = 0 | |
try: | |
while True: | |
tok = await self.L.recv() | |
tok_count += 1 | |
self.message(f"received {tok}") | |
except CancelledError: | |
self.message(f"{tok_count} tokens received") | |
class Buffer(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
self.L = InputPort() | |
self.R = OutputPort() | |
async def exec(self): | |
while True: | |
tok = await self.L.recv() | |
self.message(f"received {tok}") | |
self.message(f"sending {tok}") | |
await self.R.send(tok) | |
class Channel: | |
def __init__(self, name): | |
self.name = name | |
self.q = Queue(maxsize=1) # Max buffering of 1 | |
async def send(self, val): | |
await self.q.put(val) | |
async def recv(self): | |
tok = await self.q.get() | |
await self.q.task_done() | |
return tok | |
async def close(self): | |
await self.q.join() | |
from curio import run, spawn | |
async def system(): | |
N = 10 # How many buffers in our linear pipeline | |
# Instantiate the processes | |
src = Source('src1', 10, 1) | |
buf = [Buffer(f'buf[{i}]') for i in range(N)] | |
snk = Sink('snk') | |
# Connect the processes with the channels | |
connect(src.R, buf[0].L) | |
for i in range(1, N): | |
connect(buf[i-1].R, buf[i].L) | |
connect(snk.L, buf[N-1].R) | |
# Start the processes | |
p_src = await spawn(src.exec()) | |
p_snk = await spawn(snk.exec()) | |
p_buf = [await spawn(buf[i].exec()) for i in range(N)] | |
# Wait for the source to finish sending all its values | |
await p_src.join() | |
# Cancel the remaining processes | |
for i in range(N): | |
await p_buf[i].cancel() | |
await p_snk.cancel() | |
if __name__=='__main__': | |
run(system(), with_monitor=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment