Skip to content

Instantly share code, notes, and snippets.

@fpom
Created February 17, 2022 15:07
Show Gist options
  • Save fpom/e74e4c896de4b4ad429ab42cbf631bc4 to your computer and use it in GitHub Desktop.
Save fpom/e74e4c896de4b4ad429ab42cbf631bc4 to your computer and use it in GitHub Desktop.
Implementation of asyncio.queues.Queue for Brython
from collections import deque
from browser.aio import Future
class QueueEmpty (Exception) :
pass
class QueueFull (Exception) :
pass
class Queue (object) :
def __init__ (self, maxsize=0) :
self.maxsize = maxsize
self.data = deque(maxlen=maxsize or None)
self.readers = deque()
self.writers = deque()
self.joiners = deque()
self.tasks = 0
def qsize (self) :
return len(self.data)
def empty (self) :
return self.qsize() == 0
def full (self) :
return self.maxsize and self.qsize() == self.maxsize
async def get (self) :
if self.empty() :
future = Future()
def reader (val) :
# work around Brython bug #1897 when val is None
if val is None :
val = self # use self as replacement for None
# / work around
future.set_result(val)
self.readers.append(reader)
item = await future
# work around Brython bug #1897
if item is self :
item = None # replace back self with None
# / work around
return item
item = self.get_nowait()
if self.writers :
# unblock one writer
writer = self.writers.popleft()
writer()
return item
def get_nowait (self) :
try :
return self.data.popleft()
except IndexError :
raise QueueEmpty()
async def put (self, item) :
if self.full() :
future = Future()
def writer () :
self.put_nowait(item)
future.set_result(True)
self.writers.append(writer)
await future
return
if self.readers :
# directly pass item to the reader
self.tasks += 1
reader = self.readers.popleft()
reader(item)
else :
# self.tasks is incremented in put_nowait
self.put_nowait(item)
def put_nowait (self, item) :
if self.full() :
raise QueueFull()
self.data.append(item)
self.tasks += 1
async def join (self) :
if self.tasks > 0 :
future = Future()
def setres () :
future.set_result(True)
await future
def task_done (self) :
if self.tasks == 0 :
raise ValueError("no tasks")
self.tasks -= 1
if tasks == 0 :
for joiner in self.joiners :
joiner()
##
## test
##
from browser.aio import sleep, run
q = Queue(4)
async def producer () :
for i in range(8) :
print("put", i)
await q.put(i)
print("...put", i)
await sleep(1)
for i in range(8, 16) :
print("put", i)
await q.put(i)
print("...put", i)
await sleep(0)
async def consumer () :
await sleep(1)
for i in range(15) :
print("get", i)
await q.get()
print("...get", i)
sleep(0)
run(producer())
run(consumer())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment