Skip to content

Instantly share code, notes, and snippets.

@eladn
Last active October 21, 2023 17:18
Show Gist options
  • Save eladn/a103ecbc3db7c251d9094af958dee74b to your computer and use it in GitHub Desktop.
Save eladn/a103ecbc3db7c251d9094af958dee74b to your computer and use it in GitHub Desktop.
Python ray queue for producer-consumer pattern; prefetching chunks for periodically consumption
__author__ = "Elad Nachmias"
__email__ = "eladnah@gmail.com"
__date__ = "2023-10-19"
import time
from typing import Generic, List, Optional, TypeVar
import asyncio
import threading
import ray
from ray.exceptions import RaySystemError
T = TypeVar('T')
class ProducerConsumerRayQueue(Generic[T]):
"""
This is a producer-consumer queue that can be used to pass data in-between two ray remote tasks/workers. Especially designed for
non-blocking operations while efficiently promoting continuous progress (more details in the documentation below).
This ray-queue is adapted for producer-consumer workloads where it's known that the consumer periodically polls the queue.
It makes sure to prefetch result at the consumer (always having a pending get request in the background at the consumer end).
The get operation doesn't block until completion. It rather just ensures there's a pending request, and just momentary polls it
(single non-blocking poll per a get call) to check whether the prefetched request is completed yet and fetched locally (thus,
ready to be passed to the user without further blocking operations).
This pattern allows avoiding alternative non-blocking solutions like having a dedicated daemon thread to wait for results (at the
consumer end), which can be prone to hanging related issues.
Another capability of this queue is the chunked operations. Both the put and consume calls operate over chunks of items (passed to /
returned from accordingly). Note that this is different than having a regular queue where each item is a chunk, because here a
produced chunk is not being consumed atomically. Instead, the pushed chunks are being appended to one flattened queue (in the
underlying actor), and then consumers just take pieces of this already aggregated queue, regardless to the chunking of the items that
were pushed to formed the queue. So, a consumed chunk can potentially have items from multiple pushed chunks, and a pushed chunk can be
potentially splitted across multiple consume calls (the later is only true if `max_fetch_chunk_size` is given - otherwise a consumer
always consumes the available queue entirely (spill the entire queue every single consume call), so pushed chunks are not splitted in
this case, but still can be joined).
Comparison to Ray's builtin `ray.util.queue.Queue`:
Blocking:
Generally, note that there are two different aspects for which the notion of "blocking" (or non-blocking) can be regarded and
applied for. As the queue is distributed and items are effectively being passed across different nodes, the first "blocking" aspect
is simply whether to wait for the results from the remote call to the underlying actor that manages & synchronizes the queue. The
second "blocking" aspect is whether this actor itself deliberately procrastinates the response for such remote request whenever the
queue state is currently not allowing the requested operation to take place (namely, the queue is empty in a get() call or the queue
is at maximal capacity at put() call). Such postponed response mechanism is sometimes referred as "long-polling".
Ray's builtin queue have a boolean `blocking=False` flag. It relates only to the second-mentioned aspect. Particularly, it always blocks
wrt the first-mentioned aspect for all operations (get, put, empty). That is, setting `blocking=False` relates to whether the
underlying actor will return a result before it's available. More specifically, ray.get() is being called over a remote call to the
underlying queue actor before the operation returns. In contrast, this queue here by design never blocks wrt the first-mentioned aspect.
The second blocking aspect is being coarsely affected in ray's queue by setting the `blocking` flag and `timeout` param. In contrast,
the queue implemented here supports a bit *finer* control for this blocking aspect, by setting both the params `min_fetch_chunk_size`
and `timeout_waiting_for_min_chunk_size` - the call returns the available items after first of the two conditions is met, and finishes
successfully either way (see more details in the parameters description in the c'tor). The combination of these two encourages a
healthier workflow that both avoids the network overhead of small and frequent consequent remote calls, and in the same time avoids
big latencies and even deadlocks. Such combination is not supported by ray's queue. In conclusion, this queue differs from ray's
queue in both blocking aspects.
Chunking / batched operations:
Ray's builtin queue have support for chunked operations using the put/get_nowait_batch() operations. However, the get_nowait_batch()
requires a fixed `num_items` param (the result would be a batch of this exact size). This means that the batched get call won't
return until the requested number of items won't become available. There's also no `timeout` for this operation. This requires the
user to tune this number in advanced to both avoid suffering from long latencies (or sometimes a deadlock) and to adapt it to the
rate of the producer. Ray's queue still missing an operation to just spill all the currently-available items in the queue.
Therefore, for implementing a producer-consumer pattern using ray's queue, the user have to either limit the queue size to 1 (block
the producer until the consumer empty the queue), or to make the consumer perform get() operations within a loop until the queue is
empty, where each loop iteration involves blocking remote operations (including the loop exit condition). Otherwise, the queue size
could grow faster than it is consumed. While the queue implemented here have a finer parameters of `min/max_fetch_chunk_size` (
mix/max number of required items in the returned batch/chunk), which allows a "spill"-like operation for the entire queue. This
ultimately solves the rate(item consumption) < rate(item production) issue.
Full capacity policy:
Ray's queue allows setting the maximum queue capacity. Trying to put() an item to a queue of full capacity would either wait for the
queue to be partially consumed (in "blocking" setting), or raise an exception on a full queue (in "non-blocking" setting). The queue
implemented here supports either raising an exception on a full queue, or truncating the queue to the maximal size (cyclic-buffer).
Only this class is exposed to and used by the user. Object of this class is used directly by the end-client (consumer/producer) in its
main process (and same main thread). Under the hook, it holds a handler to the underlying queue's actor that synchronizes the queue
and being accessed internally by these client-side objects. In other words, this object is used as a communicator to the actual
queue's actor. The underlying queue's actor is being created here, without the need for the intervention / awareness of the user.
Thus, the user is only aware of and works with these main "communicator" objects of this main class. This object can be passed on (by
copy) to other user's actors - it is typically created once somewhere, and then being passed to the relevant end-points (producers and
consumers) that should communicate.
"""
def __init__(self,
min_fetch_chunk_size: int = 1,
max_fetch_chunk_size: Optional[int] = None,
timeout_waiting_for_min_chunk_size: Optional[float] = None,
max_queue_size: int = 0,
max_queue_size_exceed_handling_policy: str = 'truncate', # in {'truncate', 'raise'}
actor_options: Optional[dict] = None):
"""
:param min_fetch_chunk_size: The minimum number of items to wait for in a consume_chunk() operations. Namely, the response to the
call would be suspended until such number of items become available in the queue (or until timeout was
reached if set - the first that is met would actually decide when the call would return).
:param max_fetch_chunk_size: The maximum number of items in the chunk to return to the user on consume_chunk() operations. If given,
and the |queue| > max_fetch_chunk_size, then only the top max_fetch_chunk_size items are popped from
the queue and returned for this consume call. If not given (None), each consume_chunk() operation just
spills all the available items in the queue entirely. Note that either way this parameter won't cause
the call to be blocked (if enough items are available wrt the minimum requirement - the call returns).
:param timeout_waiting_for_min_chunk_size: Optional. timeout for returning consume_chunk() operation, even if not enough samples
are available (less then `min_fetch_chunk_size`). In such case, the operation is not
considered as failed, instead it just returns all available items although
`min_fetch_chunk_size` have not reached (can be an empty list). Namely, the
consume_chunk() operation ends successfully (either way) as the first one of the given
conditions (timeout / max_fetch_chunk_size) is met.
:param max_queue_size: Optional. If set, limit the size of the items in the queue to this given size by the chosen enforcement
policy (defined below).
:param max_queue_size_exceed_handling_policy: Defines the policy for how to enforce the maximal queue size (from above; if set).
Could be set either to 'truncate' (keep the latest tail - remove oldest samples), or
to 'raise' (raise an exception on put_chunk() operation when the queue exceeds the
maximal size).
:param actor_options: Optional. Additional ray remote options to pass to the underlying ray queue actor. For example, allows
explicitly setting the node / placement-group this actor would be instantiated in.
"""
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'}
self._pending_get_future = None
actor_options = {} if actor_options is None else actor_options
try:
self._actor = _ProducerConsumerRayQueueAsyncActor.options(**actor_options).remote(
max_queue_size=max_queue_size, max_queue_size_exceed_handling_policy=max_queue_size_exceed_handling_policy)
except RaySystemError:
# Async actors are not supported in ray's local run mode (that is active in dbg mode). We use a threaded actor instead in such
# case. The #threads should be at least 2 to support availability for a produce operation while a consume operation takes
# place (and might wait for samples, as explained in more details in the actor's implementation below).
nr_concurrent_threads = 2
assert 'max_concurrency' not in actor_options
threaded_actor_type = _ProducerConsumerRayQueueThreadedActor.options(
**actor_options, max_concurrency=nr_concurrent_threads)
self._actor = threaded_actor_type.remote(
max_queue_size=max_queue_size,
max_queue_size_exceed_handling_policy=max_queue_size_exceed_handling_policy,
nr_concurrent_threads=nr_concurrent_threads)
self._min_fetch_chunk_size = min_fetch_chunk_size
self._max_fetch_chunk_size = max_fetch_chunk_size
self._timeout_waiting_for_min_chunk_size = timeout_waiting_for_min_chunk_size
def consume_chunk(self) -> List[T]:
"""
This is a no-blocking polling operation. It means that it creates a request once called for the first time, stores its future
(promise) as a member of this class (for being polled in succeeding calls), and returns an empty list. Then, succeeding calls
would poll (without blocking) the future of the same previous request. Eventually, there will be a call (to this method) in which
the results of this request will be ready. In that call, the resulted chunk would be returned to the user and a new future request
will be created (as a prefetch). Thus, a request that has been dispatched to the queue's actor during a certain call to this method
`consume_chunk()` is not necessarily (and probably not) the same request that its results are being returned in that very call.
The hidden assumption is that the workload is endlessly continuous, so we can always make a request for future consumption calls.
Note that this object is created once and then passed to the producers & consumers. Namely, it's not aware yet whether this end is
used as a consumer upon creation. Thus, there's no initial pending prefetch request. The initial consume request (to the queue's
actor) is created only upon the first call to this consume method.
"""
self._prefetch()
finished_fetching, _ = ray.wait([self._pending_get_future], timeout=0., fetch_local=True)
fetched_chunk = []
if finished_fetching:
fetched_chunk = ray.get(self._pending_get_future)
self._pending_get_future = None
self._prefetch()
return fetched_chunk
def put_chunk(self, items: List[T]):
self._actor.put_chunk.remote(items)
def clear(self, block: bool = False) -> Optional[ray.ObjectRef]:
clear_future = self._actor.clear.remote()
# When cleared from a consumer-end, ensure formerly dispatched pending fetch operation from that consumer is being discarded, to
# guarantee pre-clear items won't be returned in this consumer after the clearance. This property is irrelevant cross consumers, as
# the operations across consumers are anyway not assumed to be synchronized.
self._pending_get_future = None
if block:
ray.get(clear_future)
else:
return clear_future
def _prefetch(self):
if self._pending_get_future is None:
self._pending_get_future = self._actor.consume_chunk.remote(
min_size=self._min_fetch_chunk_size, max_size=self._max_fetch_chunk_size,
timeout_waiting_for_min_size=self._timeout_waiting_for_min_chunk_size)
@ray.remote
class _ProducerConsumerRayQueueAsyncActor:
"""
The actor is not exposed to the queue end-user, and is known only to the exposed queue handle (defined above). The actor is used as a
centralized manner of managing & maintaining the queue itself. The exposed instance (that is used by the user) creates an actor, holds
a reference to it, and makes remote calls to it under-the-hood to perform actual queue operations.
There are 2 implementation for the queue actor (async & threaded). This is the async implementation, which is generally preferable.
Here we explain about async & threaded actors, why they're important here, what problem do they solve, what are their limitations,
and how are they differ.
The most significant and delicate issue with the queue actor is to allow having consume & produce operations being served by the actor
"concurrently". In other words, always allow a "produce" operation being scheduled and served, even if unlimited number of consume
operations have been already triggered before (and maybe started being served). Otherwise, trying to serve the consume operations
without serving the produce operation could lead to a deadlock, where the consume operations are indefinitely waiting for enough items
in the queue that would never become available. More detailed, as the consume operation may not return immediately (wait for enough
items to be available), the actor should be able to execute a pending produce call before that consume call serving has been finished.
The (simpler) threaded actor (not this implementation, but the one below) has a limitation for guaranteeing this desired property
(explained in detail in the threaded actor implementation below).
Here, in the async implementation, the two main different queue operations (consume_chunk, put_chunk) are implemented as async
functions. That is, the user can invoke multiple calls, that can be all start being executed "together" in an asynchronous manner as
coroutines. The control of the process is being passed between these different coroutines during their lifetimes. A consume coroutine
can yield (if not enough items are available for example) in favor of scheduling another pending produce operation. This, in practice,
solves the issue demonstrated above.
"""
def __init__(self, max_queue_size: int = 0, max_queue_size_exceed_handling_policy: str = 'truncate'):
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'}
self._queue: List[T] = []
self._queue_lock = asyncio.Lock() # for accessing the queue concurrently by the reader & writers
self._new_items_notifier_sem = asyncio.Semaphore(0) # for waking-up awaiting consumer requests when new data arrives
self._serial_consumers_serving_sem = asyncio.BoundedSemaphore(1) # for serving consumers' requests serially
self._max_queue_size = max_queue_size
self._max_queue_size_exceed_handling_policy = max_queue_size_exceed_handling_policy
async def consume_chunk(self, min_size: int = 1, max_size: Optional[int] = None,
timeout_waiting_for_min_size: Optional[float] = None) -> List[T]:
assert min_size >= 0
assert max_size is None or min_size <= max_size
# ~FIFO readers serving - simplest policy to guarantee no starving calls with relatively big requested `min_size`
start_time = time.monotonic()
async with self._serial_consumers_serving_sem:
# Wait in a loop until enough items are available. Note that we wait inside the consumer-serving semaphore acquired. That means
# that no other consumer's request can be served while this request waits to be served. This is for avoiding starvation of big
# requests.
should_wait_for_new_items = False
while True:
if should_wait_for_new_items:
rest_timeout = None if timeout_waiting_for_min_size is None else \
max(0., timeout_waiting_for_min_size - (time.monotonic() - start_time))
try:
# wait for a new chunk of items to be added
await asyncio.wait_for(self._new_items_notifier_sem.acquire(), timeout=rest_timeout)
except asyncio.TimeoutError:
pass # timeout passed - should return a chunk although min_size might not available yet
# we lock the queue inside the attempts loop - so it won't stay locked while we wait
async with self._queue_lock:
if len(self._queue) < min_size and (
timeout_waiting_for_min_size is None or time.monotonic() - start_time < timeout_waiting_for_min_size):
should_wait_for_new_items = True
continue
if max_size is None:
returned_chunk = self._queue.copy()
self._queue.clear()
else:
returned_chunk = self._queue[:max_size].copy()
self._queue = self._queue[max_size:]
return returned_chunk
async def put_chunk(self, chunk: List[T]):
async with self._queue_lock:
self._queue.extend(chunk)
if self._max_queue_size and len(self._queue) > self._max_queue_size:
if self._max_queue_size_exceed_handling_policy == 'truncate':
self._queue = self._queue[-self._max_queue_size:]
elif self._max_queue_size_exceed_handling_policy == 'raise':
raise RuntimeError(f'Max queue size [{self._max_queue_size}] has exceeded.')
else:
raise ValueError(f'Unsupported max queue size handling policy `{self._max_queue_size_exceed_handling_policy}`.')
# We keep the semaphore bounded by 1 to avoid redundant consumer requests wake-ups. The following condition practically makes
# this semaphore to be bounded by 1 (not using a `BoundedSemaphore` because we'd like to initialize it to value of 0 but bound
# it to 1, and we cannot call acquire() from __init__ as it's a async method).
# Note that under the queue-lock no other release()s could be made to this semaphore (this is the only place it's
# being released); that is, after the "if"-condition below is evaluated to be true (the semaphore is locked), this status
# cannot be changed before the increment statement (releasing the semaphore) within the if-branch.
if self._new_items_notifier_sem.locked():
self._new_items_notifier_sem.release() # notify consumers that there are new items available in the queue
async def clear(self):
async with self._queue_lock:
self._queue.clear()
@ray.remote
class _ProducerConsumerRayQueueThreadedActor:
"""
For more information about the queue actor, see documentation in `_ProducerConsumerRayQueueAsyncActor` above.
Async actors (implemented above) are not supported in local mode (used in debug). Thus, we implement here a (generally less-preferable)
threaded actor option for the local run mode. The (simpler) threaded actor (not this implementation, but the one below) allows
serving predefined n calls concurrently, but it doesn't guarantee preserving a thread for a future produce operation - so it is
possible that n consume operations can fully utilize these n available threads at the same time (and blocking them - as there are
currently not enough available items in the queue) without preserving a thread available for another pending *produce* operation -
that will cause a deadlock (in case of multiple consumers).
To avoid such deadlocks, we exit a consume call immediately if consumers fully utilize all available threads. This guarantees that an
available thread is being preserved in favor of a future produce call.
Note: Having saying that, newer versions of ray that are currently not available for us have a feature called concurrency groups that
can solve this issue by strictly & explicitly assigning a given number of threads for each kind of operation.
"""
def __init__(self, max_queue_size: int = 0, max_queue_size_exceed_handling_policy: str = 'truncate',
nr_concurrent_threads: Optional[int] = None):
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'}
self._queue: List[T] = []
# For accessing the queue concurrently by the reader & writers.
self._queue_lock = threading.Lock()
# For waking-up awaiting consumer requests when new data arrives.
# Keeping the semaphore bounded helps avoiding redundant consumer requests wake-ups.
self._new_items_notifier_sem = threading.BoundedSemaphore(1) # set max value to 1
self._new_items_notifier_sem.acquire() # set initial value to 0 (no new items currently)
# For serving consumers' requests serially (guarantee serving all eventually).
self._serial_consumers_serving_sem = threading.BoundedSemaphore(1)
self._max_queue_size = max_queue_size
self._max_queue_size_exceed_handling_policy = max_queue_size_exceed_handling_policy
self._nr_concurrent_threads = nr_concurrent_threads
self._cur_nr_concurrently_served_consume_operations = 0
self._cur_served_consumers_counter_lock = threading.Lock()
def consume_chunk(self, min_size: int = 1, max_size: Optional[int] = None,
timeout_waiting_for_min_size: Optional[float] = None) -> List[T]:
assert min_size >= 0
assert max_size is None or min_size <= max_size
# Guarantee at least one thread is preserved available for a future produce operation if #threads is known.
if self._nr_concurrent_threads is not None:
with self._cur_served_consumers_counter_lock:
assert self._nr_concurrent_threads >= self._cur_nr_concurrently_served_consume_operations
if self._nr_concurrent_threads == self._cur_nr_concurrently_served_consume_operations:
# Exit a consume call immediately if consumers fully utilize all available threads (to avoid deadlock).
return []
self._cur_nr_concurrently_served_consume_operations += 1
start_time = time.monotonic()
# ~FIFO readers serving - simplest policy to guarantee no starving calls with relatively big requested `min_size`
with self._serial_consumers_serving_sem:
# Wait in a loop until enough items are available. Note that we wait inside the consumer-serving semaphore acquired. That means
# that no other consumer's request can be served while this request waits to be served. This is for avoiding starvation of big
# requests.
should_wait_for_new_items = False
while True:
if should_wait_for_new_items:
rest_timeout = None if timeout_waiting_for_min_size is None else \
max(0., timeout_waiting_for_min_size - (time.monotonic() - start_time))
# Note: Threaded semaphore acquire() method returns whether it succeeded acquiring in the given time span. However, we
# don't care here if timeout passed. Just want to limit the waiting. Below we detect exceeded timeout and prevent the
# waiting for new items in such case.
self._new_items_notifier_sem.acquire(timeout=rest_timeout) # wait for a new chunk of items to be added
with self._queue_lock: # we lock the queue inside the attempts loop - so it won't stay locked while we wait
if len(self._queue) < min_size and (
timeout_waiting_for_min_size is None or time.monotonic() - start_time < timeout_waiting_for_min_size):
should_wait_for_new_items = True
continue
if max_size is None:
returned_chunk = self._queue.copy()
self._queue.clear()
else:
returned_chunk = self._queue[:max_size].copy()
self._queue = self._queue[max_size:]
if self._nr_concurrent_threads is not None:
with self._cur_served_consumers_counter_lock:
self._cur_nr_concurrently_served_consume_operations -= 1
return returned_chunk
def put_chunk(self, chunk: List[T]):
with self._queue_lock:
self._queue.extend(chunk)
if self._max_queue_size and len(self._queue) > self._max_queue_size:
if self._max_queue_size_exceed_handling_policy == 'truncate':
self._queue = self._queue[-self._max_queue_size:]
elif self._max_queue_size_exceed_handling_policy == 'raise':
raise RuntimeError(f'Max queue size [{self._max_queue_size}] has exceeded.')
else:
raise ValueError(f'Unsupported max queue size handling policy `{self._max_queue_size_exceed_handling_policy}`.')
try:
self._new_items_notifier_sem.release() # notify consumers that there are new items available in the queue
except ValueError:
# As this semaphore is bounded to 1, we'll get an error if trying to increment while the current value is already 1.
# It doesn't pose an issue - we just want to make sure here that the value is 1 to ensure a consumer would be woken-up.
pass
def clear(self):
with self._queue_lock:
self._queue.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment