Created
February 17, 2022 15:07
-
-
Save fpom/e74e4c896de4b4ad429ab42cbf631bc4 to your computer and use it in GitHub Desktop.
Implementation of asyncio.queues.Queue for Brython
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from browser.aio import Future | |
class QueueEmpty (Exception) : | |
pass | |
class QueueFull (Exception) : | |
pass | |
class Queue (object) : | |
def __init__ (self, maxsize=0) : | |
self.maxsize = maxsize | |
self.data = deque(maxlen=maxsize or None) | |
self.readers = deque() | |
self.writers = deque() | |
self.joiners = deque() | |
self.tasks = 0 | |
def qsize (self) : | |
return len(self.data) | |
def empty (self) : | |
return self.qsize() == 0 | |
def full (self) : | |
return self.maxsize and self.qsize() == self.maxsize | |
async def get (self) : | |
if self.empty() : | |
future = Future() | |
def reader (val) : | |
# work around Brython bug #1897 when val is None | |
if val is None : | |
val = self # use self as replacement for None | |
# / work around | |
future.set_result(val) | |
self.readers.append(reader) | |
item = await future | |
# work around Brython bug #1897 | |
if item is self : | |
item = None # replace back self with None | |
# / work around | |
return item | |
item = self.get_nowait() | |
if self.writers : | |
# unblock one writer | |
writer = self.writers.popleft() | |
writer() | |
return item | |
def get_nowait (self) : | |
try : | |
return self.data.popleft() | |
except IndexError : | |
raise QueueEmpty() | |
async def put (self, item) : | |
if self.full() : | |
future = Future() | |
def writer () : | |
self.put_nowait(item) | |
future.set_result(True) | |
self.writers.append(writer) | |
await future | |
return | |
if self.readers : | |
# directly pass item to the reader | |
self.tasks += 1 | |
reader = self.readers.popleft() | |
reader(item) | |
else : | |
# self.tasks is incremented in put_nowait | |
self.put_nowait(item) | |
def put_nowait (self, item) : | |
if self.full() : | |
raise QueueFull() | |
self.data.append(item) | |
self.tasks += 1 | |
async def join (self) : | |
if self.tasks > 0 : | |
future = Future() | |
def setres () : | |
future.set_result(True) | |
await future | |
def task_done (self) : | |
if self.tasks == 0 : | |
raise ValueError("no tasks") | |
self.tasks -= 1 | |
if tasks == 0 : | |
for joiner in self.joiners : | |
joiner() | |
## | |
## test | |
## | |
from browser.aio import sleep, run | |
q = Queue(4) | |
async def producer () : | |
for i in range(8) : | |
print("put", i) | |
await q.put(i) | |
print("...put", i) | |
await sleep(1) | |
for i in range(8, 16) : | |
print("put", i) | |
await q.put(i) | |
print("...put", i) | |
await sleep(0) | |
async def consumer () : | |
await sleep(1) | |
for i in range(15) : | |
print("get", i) | |
await q.get() | |
print("...get", i) | |
sleep(0) | |
run(producer()) | |
run(consumer()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment