Last active
October 21, 2023 17:18
-
-
Save eladn/a103ecbc3db7c251d9094af958dee74b to your computer and use it in GitHub Desktop.
Python ray queue for producer-consumer pattern; prefetching chunks for periodically consumption
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = "Elad Nachmias" | |
__email__ = "eladnah@gmail.com" | |
__date__ = "2023-10-19" | |
import time | |
from typing import Generic, List, Optional, TypeVar | |
import asyncio | |
import threading | |
import ray | |
from ray.exceptions import RaySystemError | |
T = TypeVar('T') | |
class ProducerConsumerRayQueue(Generic[T]): | |
""" | |
This is a producer-consumer queue that can be used to pass data in-between two ray remote tasks/workers. Especially designed for | |
non-blocking operations while efficiently promoting continuous progress (more details in the documentation below). | |
This ray-queue is adapted for producer-consumer workloads where it's known that the consumer periodically polls the queue. | |
It makes sure to prefetch result at the consumer (always having a pending get request in the background at the consumer end). | |
The get operation doesn't block until completion. It rather just ensures there's a pending request, and just momentary polls it | |
(single non-blocking poll per a get call) to check whether the prefetched request is completed yet and fetched locally (thus, | |
ready to be passed to the user without further blocking operations). | |
This pattern allows avoiding alternative non-blocking solutions like having a dedicated daemon thread to wait for results (at the | |
consumer end), which can be prone to hanging related issues. | |
Another capability of this queue is the chunked operations. Both the put and consume calls operate over chunks of items (passed to / | |
returned from accordingly). Note that this is different than having a regular queue where each item is a chunk, because here a | |
produced chunk is not being consumed atomically. Instead, the pushed chunks are being appended to one flattened queue (in the | |
underlying actor), and then consumers just take pieces of this already aggregated queue, regardless to the chunking of the items that | |
were pushed to formed the queue. So, a consumed chunk can potentially have items from multiple pushed chunks, and a pushed chunk can be | |
potentially splitted across multiple consume calls (the later is only true if `max_fetch_chunk_size` is given - otherwise a consumer | |
always consumes the available queue entirely (spill the entire queue every single consume call), so pushed chunks are not splitted in | |
this case, but still can be joined). | |
Comparison to Ray's builtin `ray.util.queue.Queue`: | |
Blocking: | |
Generally, note that there are two different aspects for which the notion of "blocking" (or non-blocking) can be regarded and | |
applied for. As the queue is distributed and items are effectively being passed across different nodes, the first "blocking" aspect | |
is simply whether to wait for the results from the remote call to the underlying actor that manages & synchronizes the queue. The | |
second "blocking" aspect is whether this actor itself deliberately procrastinates the response for such remote request whenever the | |
queue state is currently not allowing the requested operation to take place (namely, the queue is empty in a get() call or the queue | |
is at maximal capacity at put() call). Such postponed response mechanism is sometimes referred as "long-polling". | |
Ray's builtin queue have a boolean `blocking=False` flag. It relates only to the second-mentioned aspect. Particularly, it always blocks | |
wrt the first-mentioned aspect for all operations (get, put, empty). That is, setting `blocking=False` relates to whether the | |
underlying actor will return a result before it's available. More specifically, ray.get() is being called over a remote call to the | |
underlying queue actor before the operation returns. In contrast, this queue here by design never blocks wrt the first-mentioned aspect. | |
The second blocking aspect is being coarsely affected in ray's queue by setting the `blocking` flag and `timeout` param. In contrast, | |
the queue implemented here supports a bit *finer* control for this blocking aspect, by setting both the params `min_fetch_chunk_size` | |
and `timeout_waiting_for_min_chunk_size` - the call returns the available items after first of the two conditions is met, and finishes | |
successfully either way (see more details in the parameters description in the c'tor). The combination of these two encourages a | |
healthier workflow that both avoids the network overhead of small and frequent consequent remote calls, and in the same time avoids | |
big latencies and even deadlocks. Such combination is not supported by ray's queue. In conclusion, this queue differs from ray's | |
queue in both blocking aspects. | |
Chunking / batched operations: | |
Ray's builtin queue have support for chunked operations using the put/get_nowait_batch() operations. However, the get_nowait_batch() | |
requires a fixed `num_items` param (the result would be a batch of this exact size). This means that the batched get call won't | |
return until the requested number of items won't become available. There's also no `timeout` for this operation. This requires the | |
user to tune this number in advanced to both avoid suffering from long latencies (or sometimes a deadlock) and to adapt it to the | |
rate of the producer. Ray's queue still missing an operation to just spill all the currently-available items in the queue. | |
Therefore, for implementing a producer-consumer pattern using ray's queue, the user have to either limit the queue size to 1 (block | |
the producer until the consumer empty the queue), or to make the consumer perform get() operations within a loop until the queue is | |
empty, where each loop iteration involves blocking remote operations (including the loop exit condition). Otherwise, the queue size | |
could grow faster than it is consumed. While the queue implemented here have a finer parameters of `min/max_fetch_chunk_size` ( | |
mix/max number of required items in the returned batch/chunk), which allows a "spill"-like operation for the entire queue. This | |
ultimately solves the rate(item consumption) < rate(item production) issue. | |
Full capacity policy: | |
Ray's queue allows setting the maximum queue capacity. Trying to put() an item to a queue of full capacity would either wait for the | |
queue to be partially consumed (in "blocking" setting), or raise an exception on a full queue (in "non-blocking" setting). The queue | |
implemented here supports either raising an exception on a full queue, or truncating the queue to the maximal size (cyclic-buffer). | |
Only this class is exposed to and used by the user. Object of this class is used directly by the end-client (consumer/producer) in its | |
main process (and same main thread). Under the hook, it holds a handler to the underlying queue's actor that synchronizes the queue | |
and being accessed internally by these client-side objects. In other words, this object is used as a communicator to the actual | |
queue's actor. The underlying queue's actor is being created here, without the need for the intervention / awareness of the user. | |
Thus, the user is only aware of and works with these main "communicator" objects of this main class. This object can be passed on (by | |
copy) to other user's actors - it is typically created once somewhere, and then being passed to the relevant end-points (producers and | |
consumers) that should communicate. | |
""" | |
def __init__(self, | |
min_fetch_chunk_size: int = 1, | |
max_fetch_chunk_size: Optional[int] = None, | |
timeout_waiting_for_min_chunk_size: Optional[float] = None, | |
max_queue_size: int = 0, | |
max_queue_size_exceed_handling_policy: str = 'truncate', # in {'truncate', 'raise'} | |
actor_options: Optional[dict] = None): | |
""" | |
:param min_fetch_chunk_size: The minimum number of items to wait for in a consume_chunk() operations. Namely, the response to the | |
call would be suspended until such number of items become available in the queue (or until timeout was | |
reached if set - the first that is met would actually decide when the call would return). | |
:param max_fetch_chunk_size: The maximum number of items in the chunk to return to the user on consume_chunk() operations. If given, | |
and the |queue| > max_fetch_chunk_size, then only the top max_fetch_chunk_size items are popped from | |
the queue and returned for this consume call. If not given (None), each consume_chunk() operation just | |
spills all the available items in the queue entirely. Note that either way this parameter won't cause | |
the call to be blocked (if enough items are available wrt the minimum requirement - the call returns). | |
:param timeout_waiting_for_min_chunk_size: Optional. timeout for returning consume_chunk() operation, even if not enough samples | |
are available (less then `min_fetch_chunk_size`). In such case, the operation is not | |
considered as failed, instead it just returns all available items although | |
`min_fetch_chunk_size` have not reached (can be an empty list). Namely, the | |
consume_chunk() operation ends successfully (either way) as the first one of the given | |
conditions (timeout / max_fetch_chunk_size) is met. | |
:param max_queue_size: Optional. If set, limit the size of the items in the queue to this given size by the chosen enforcement | |
policy (defined below). | |
:param max_queue_size_exceed_handling_policy: Defines the policy for how to enforce the maximal queue size (from above; if set). | |
Could be set either to 'truncate' (keep the latest tail - remove oldest samples), or | |
to 'raise' (raise an exception on put_chunk() operation when the queue exceeds the | |
maximal size). | |
:param actor_options: Optional. Additional ray remote options to pass to the underlying ray queue actor. For example, allows | |
explicitly setting the node / placement-group this actor would be instantiated in. | |
""" | |
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'} | |
self._pending_get_future = None | |
actor_options = {} if actor_options is None else actor_options | |
try: | |
self._actor = _ProducerConsumerRayQueueAsyncActor.options(**actor_options).remote( | |
max_queue_size=max_queue_size, max_queue_size_exceed_handling_policy=max_queue_size_exceed_handling_policy) | |
except RaySystemError: | |
# Async actors are not supported in ray's local run mode (that is active in dbg mode). We use a threaded actor instead in such | |
# case. The #threads should be at least 2 to support availability for a produce operation while a consume operation takes | |
# place (and might wait for samples, as explained in more details in the actor's implementation below). | |
nr_concurrent_threads = 2 | |
assert 'max_concurrency' not in actor_options | |
threaded_actor_type = _ProducerConsumerRayQueueThreadedActor.options( | |
**actor_options, max_concurrency=nr_concurrent_threads) | |
self._actor = threaded_actor_type.remote( | |
max_queue_size=max_queue_size, | |
max_queue_size_exceed_handling_policy=max_queue_size_exceed_handling_policy, | |
nr_concurrent_threads=nr_concurrent_threads) | |
self._min_fetch_chunk_size = min_fetch_chunk_size | |
self._max_fetch_chunk_size = max_fetch_chunk_size | |
self._timeout_waiting_for_min_chunk_size = timeout_waiting_for_min_chunk_size | |
def consume_chunk(self) -> List[T]: | |
""" | |
This is a no-blocking polling operation. It means that it creates a request once called for the first time, stores its future | |
(promise) as a member of this class (for being polled in succeeding calls), and returns an empty list. Then, succeeding calls | |
would poll (without blocking) the future of the same previous request. Eventually, there will be a call (to this method) in which | |
the results of this request will be ready. In that call, the resulted chunk would be returned to the user and a new future request | |
will be created (as a prefetch). Thus, a request that has been dispatched to the queue's actor during a certain call to this method | |
`consume_chunk()` is not necessarily (and probably not) the same request that its results are being returned in that very call. | |
The hidden assumption is that the workload is endlessly continuous, so we can always make a request for future consumption calls. | |
Note that this object is created once and then passed to the producers & consumers. Namely, it's not aware yet whether this end is | |
used as a consumer upon creation. Thus, there's no initial pending prefetch request. The initial consume request (to the queue's | |
actor) is created only upon the first call to this consume method. | |
""" | |
self._prefetch() | |
finished_fetching, _ = ray.wait([self._pending_get_future], timeout=0., fetch_local=True) | |
fetched_chunk = [] | |
if finished_fetching: | |
fetched_chunk = ray.get(self._pending_get_future) | |
self._pending_get_future = None | |
self._prefetch() | |
return fetched_chunk | |
def put_chunk(self, items: List[T]): | |
self._actor.put_chunk.remote(items) | |
def clear(self, block: bool = False) -> Optional[ray.ObjectRef]: | |
clear_future = self._actor.clear.remote() | |
# When cleared from a consumer-end, ensure formerly dispatched pending fetch operation from that consumer is being discarded, to | |
# guarantee pre-clear items won't be returned in this consumer after the clearance. This property is irrelevant cross consumers, as | |
# the operations across consumers are anyway not assumed to be synchronized. | |
self._pending_get_future = None | |
if block: | |
ray.get(clear_future) | |
else: | |
return clear_future | |
def _prefetch(self): | |
if self._pending_get_future is None: | |
self._pending_get_future = self._actor.consume_chunk.remote( | |
min_size=self._min_fetch_chunk_size, max_size=self._max_fetch_chunk_size, | |
timeout_waiting_for_min_size=self._timeout_waiting_for_min_chunk_size) | |
@ray.remote | |
class _ProducerConsumerRayQueueAsyncActor: | |
""" | |
The actor is not exposed to the queue end-user, and is known only to the exposed queue handle (defined above). The actor is used as a | |
centralized manner of managing & maintaining the queue itself. The exposed instance (that is used by the user) creates an actor, holds | |
a reference to it, and makes remote calls to it under-the-hood to perform actual queue operations. | |
There are 2 implementation for the queue actor (async & threaded). This is the async implementation, which is generally preferable. | |
Here we explain about async & threaded actors, why they're important here, what problem do they solve, what are their limitations, | |
and how are they differ. | |
The most significant and delicate issue with the queue actor is to allow having consume & produce operations being served by the actor | |
"concurrently". In other words, always allow a "produce" operation being scheduled and served, even if unlimited number of consume | |
operations have been already triggered before (and maybe started being served). Otherwise, trying to serve the consume operations | |
without serving the produce operation could lead to a deadlock, where the consume operations are indefinitely waiting for enough items | |
in the queue that would never become available. More detailed, as the consume operation may not return immediately (wait for enough | |
items to be available), the actor should be able to execute a pending produce call before that consume call serving has been finished. | |
The (simpler) threaded actor (not this implementation, but the one below) has a limitation for guaranteeing this desired property | |
(explained in detail in the threaded actor implementation below). | |
Here, in the async implementation, the two main different queue operations (consume_chunk, put_chunk) are implemented as async | |
functions. That is, the user can invoke multiple calls, that can be all start being executed "together" in an asynchronous manner as | |
coroutines. The control of the process is being passed between these different coroutines during their lifetimes. A consume coroutine | |
can yield (if not enough items are available for example) in favor of scheduling another pending produce operation. This, in practice, | |
solves the issue demonstrated above. | |
""" | |
def __init__(self, max_queue_size: int = 0, max_queue_size_exceed_handling_policy: str = 'truncate'): | |
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'} | |
self._queue: List[T] = [] | |
self._queue_lock = asyncio.Lock() # for accessing the queue concurrently by the reader & writers | |
self._new_items_notifier_sem = asyncio.Semaphore(0) # for waking-up awaiting consumer requests when new data arrives | |
self._serial_consumers_serving_sem = asyncio.BoundedSemaphore(1) # for serving consumers' requests serially | |
self._max_queue_size = max_queue_size | |
self._max_queue_size_exceed_handling_policy = max_queue_size_exceed_handling_policy | |
async def consume_chunk(self, min_size: int = 1, max_size: Optional[int] = None, | |
timeout_waiting_for_min_size: Optional[float] = None) -> List[T]: | |
assert min_size >= 0 | |
assert max_size is None or min_size <= max_size | |
# ~FIFO readers serving - simplest policy to guarantee no starving calls with relatively big requested `min_size` | |
start_time = time.monotonic() | |
async with self._serial_consumers_serving_sem: | |
# Wait in a loop until enough items are available. Note that we wait inside the consumer-serving semaphore acquired. That means | |
# that no other consumer's request can be served while this request waits to be served. This is for avoiding starvation of big | |
# requests. | |
should_wait_for_new_items = False | |
while True: | |
if should_wait_for_new_items: | |
rest_timeout = None if timeout_waiting_for_min_size is None else \ | |
max(0., timeout_waiting_for_min_size - (time.monotonic() - start_time)) | |
try: | |
# wait for a new chunk of items to be added | |
await asyncio.wait_for(self._new_items_notifier_sem.acquire(), timeout=rest_timeout) | |
except asyncio.TimeoutError: | |
pass # timeout passed - should return a chunk although min_size might not available yet | |
# we lock the queue inside the attempts loop - so it won't stay locked while we wait | |
async with self._queue_lock: | |
if len(self._queue) < min_size and ( | |
timeout_waiting_for_min_size is None or time.monotonic() - start_time < timeout_waiting_for_min_size): | |
should_wait_for_new_items = True | |
continue | |
if max_size is None: | |
returned_chunk = self._queue.copy() | |
self._queue.clear() | |
else: | |
returned_chunk = self._queue[:max_size].copy() | |
self._queue = self._queue[max_size:] | |
return returned_chunk | |
async def put_chunk(self, chunk: List[T]): | |
async with self._queue_lock: | |
self._queue.extend(chunk) | |
if self._max_queue_size and len(self._queue) > self._max_queue_size: | |
if self._max_queue_size_exceed_handling_policy == 'truncate': | |
self._queue = self._queue[-self._max_queue_size:] | |
elif self._max_queue_size_exceed_handling_policy == 'raise': | |
raise RuntimeError(f'Max queue size [{self._max_queue_size}] has exceeded.') | |
else: | |
raise ValueError(f'Unsupported max queue size handling policy `{self._max_queue_size_exceed_handling_policy}`.') | |
# We keep the semaphore bounded by 1 to avoid redundant consumer requests wake-ups. The following condition practically makes | |
# this semaphore to be bounded by 1 (not using a `BoundedSemaphore` because we'd like to initialize it to value of 0 but bound | |
# it to 1, and we cannot call acquire() from __init__ as it's a async method). | |
# Note that under the queue-lock no other release()s could be made to this semaphore (this is the only place it's | |
# being released); that is, after the "if"-condition below is evaluated to be true (the semaphore is locked), this status | |
# cannot be changed before the increment statement (releasing the semaphore) within the if-branch. | |
if self._new_items_notifier_sem.locked(): | |
self._new_items_notifier_sem.release() # notify consumers that there are new items available in the queue | |
async def clear(self): | |
async with self._queue_lock: | |
self._queue.clear() | |
@ray.remote | |
class _ProducerConsumerRayQueueThreadedActor: | |
""" | |
For more information about the queue actor, see documentation in `_ProducerConsumerRayQueueAsyncActor` above. | |
Async actors (implemented above) are not supported in local mode (used in debug). Thus, we implement here a (generally less-preferable) | |
threaded actor option for the local run mode. The (simpler) threaded actor (not this implementation, but the one below) allows | |
serving predefined n calls concurrently, but it doesn't guarantee preserving a thread for a future produce operation - so it is | |
possible that n consume operations can fully utilize these n available threads at the same time (and blocking them - as there are | |
currently not enough available items in the queue) without preserving a thread available for another pending *produce* operation - | |
that will cause a deadlock (in case of multiple consumers). | |
To avoid such deadlocks, we exit a consume call immediately if consumers fully utilize all available threads. This guarantees that an | |
available thread is being preserved in favor of a future produce call. | |
Note: Having saying that, newer versions of ray that are currently not available for us have a feature called concurrency groups that | |
can solve this issue by strictly & explicitly assigning a given number of threads for each kind of operation. | |
""" | |
def __init__(self, max_queue_size: int = 0, max_queue_size_exceed_handling_policy: str = 'truncate', | |
nr_concurrent_threads: Optional[int] = None): | |
assert max_queue_size_exceed_handling_policy in {'truncate', 'raise'} | |
self._queue: List[T] = [] | |
# For accessing the queue concurrently by the reader & writers. | |
self._queue_lock = threading.Lock() | |
# For waking-up awaiting consumer requests when new data arrives. | |
# Keeping the semaphore bounded helps avoiding redundant consumer requests wake-ups. | |
self._new_items_notifier_sem = threading.BoundedSemaphore(1) # set max value to 1 | |
self._new_items_notifier_sem.acquire() # set initial value to 0 (no new items currently) | |
# For serving consumers' requests serially (guarantee serving all eventually). | |
self._serial_consumers_serving_sem = threading.BoundedSemaphore(1) | |
self._max_queue_size = max_queue_size | |
self._max_queue_size_exceed_handling_policy = max_queue_size_exceed_handling_policy | |
self._nr_concurrent_threads = nr_concurrent_threads | |
self._cur_nr_concurrently_served_consume_operations = 0 | |
self._cur_served_consumers_counter_lock = threading.Lock() | |
def consume_chunk(self, min_size: int = 1, max_size: Optional[int] = None, | |
timeout_waiting_for_min_size: Optional[float] = None) -> List[T]: | |
assert min_size >= 0 | |
assert max_size is None or min_size <= max_size | |
# Guarantee at least one thread is preserved available for a future produce operation if #threads is known. | |
if self._nr_concurrent_threads is not None: | |
with self._cur_served_consumers_counter_lock: | |
assert self._nr_concurrent_threads >= self._cur_nr_concurrently_served_consume_operations | |
if self._nr_concurrent_threads == self._cur_nr_concurrently_served_consume_operations: | |
# Exit a consume call immediately if consumers fully utilize all available threads (to avoid deadlock). | |
return [] | |
self._cur_nr_concurrently_served_consume_operations += 1 | |
start_time = time.monotonic() | |
# ~FIFO readers serving - simplest policy to guarantee no starving calls with relatively big requested `min_size` | |
with self._serial_consumers_serving_sem: | |
# Wait in a loop until enough items are available. Note that we wait inside the consumer-serving semaphore acquired. That means | |
# that no other consumer's request can be served while this request waits to be served. This is for avoiding starvation of big | |
# requests. | |
should_wait_for_new_items = False | |
while True: | |
if should_wait_for_new_items: | |
rest_timeout = None if timeout_waiting_for_min_size is None else \ | |
max(0., timeout_waiting_for_min_size - (time.monotonic() - start_time)) | |
# Note: Threaded semaphore acquire() method returns whether it succeeded acquiring in the given time span. However, we | |
# don't care here if timeout passed. Just want to limit the waiting. Below we detect exceeded timeout and prevent the | |
# waiting for new items in such case. | |
self._new_items_notifier_sem.acquire(timeout=rest_timeout) # wait for a new chunk of items to be added | |
with self._queue_lock: # we lock the queue inside the attempts loop - so it won't stay locked while we wait | |
if len(self._queue) < min_size and ( | |
timeout_waiting_for_min_size is None or time.monotonic() - start_time < timeout_waiting_for_min_size): | |
should_wait_for_new_items = True | |
continue | |
if max_size is None: | |
returned_chunk = self._queue.copy() | |
self._queue.clear() | |
else: | |
returned_chunk = self._queue[:max_size].copy() | |
self._queue = self._queue[max_size:] | |
if self._nr_concurrent_threads is not None: | |
with self._cur_served_consumers_counter_lock: | |
self._cur_nr_concurrently_served_consume_operations -= 1 | |
return returned_chunk | |
def put_chunk(self, chunk: List[T]): | |
with self._queue_lock: | |
self._queue.extend(chunk) | |
if self._max_queue_size and len(self._queue) > self._max_queue_size: | |
if self._max_queue_size_exceed_handling_policy == 'truncate': | |
self._queue = self._queue[-self._max_queue_size:] | |
elif self._max_queue_size_exceed_handling_policy == 'raise': | |
raise RuntimeError(f'Max queue size [{self._max_queue_size}] has exceeded.') | |
else: | |
raise ValueError(f'Unsupported max queue size handling policy `{self._max_queue_size_exceed_handling_policy}`.') | |
try: | |
self._new_items_notifier_sem.release() # notify consumers that there are new items available in the queue | |
except ValueError: | |
# As this semaphore is bounded to 1, we'll get an error if trying to increment while the current value is already 1. | |
# It doesn't pose an issue - we just want to make sure here that the value is 1 to ensure a consumer would be woken-up. | |
pass | |
def clear(self): | |
with self._queue_lock: | |
self._queue.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment