Created
October 11, 2020 08:12
-
-
Save fr-ser/347c00e0026cd7941e1d73fd36e54887 to your computer and use it in GitHub Desktop.
Example code to allow take to have manually acknowledged messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faust | |
from faust import Stream | |
from noack_take import noack_take | |
# monkeypatch noack_take on streams | |
Stream.noack_take = noack_take |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio import CancelledError | |
from mode import Seconds, want_seconds | |
from mode.utils.aiter import aiter | |
from mode.utils.futures import notify | |
from typing import ( | |
AsyncIterable, | |
List, | |
Optional, | |
Sequence, | |
cast, | |
) | |
from faust.types import ChannelT, EventT | |
from faust.types.streams import ( | |
T, | |
T_co, | |
) | |
# mainly copied from faust | |
# faust/streams.py | |
# changes from the faust tests are marked with a "code change" comment | |
async def noack_take(self, max_: int, | |
within: Seconds) -> AsyncIterable[Sequence[T_co]]: | |
"""Buffer n values at a time and yield a list of buffered values. | |
Arguments: | |
within: Timeout for when we give up waiting for another value, | |
and process the values we have. | |
Warning: If there's no timeout (i.e. `timeout=None`), | |
the agent is likely to stall and block buffered events for | |
an unreasonable length of time(!). | |
""" | |
buffer: List[T_co] = [] | |
events: List[EventT] = [] | |
buffer_add = buffer.append | |
event_add = events.append | |
buffer_size = buffer.__len__ | |
buffer_full = asyncio.Event(loop=self.loop) | |
buffer_consumed = asyncio.Event(loop=self.loop) | |
timeout = want_seconds(within) if within else None | |
stream_enable_acks: bool = self.enable_acks | |
buffer_consuming: Optional[asyncio.Future] = None | |
channel_it = aiter(self.channel) | |
# We add this processor to populate the buffer, and the stream | |
# is passively consumed in the background (enable_passive below). | |
async def add_to_buffer(value: T) -> T: | |
try: | |
# buffer_consuming is set when consuming buffer | |
# after timeout. | |
nonlocal buffer_consuming | |
if buffer_consuming is not None: | |
try: | |
await buffer_consuming | |
finally: | |
buffer_consuming = None | |
# code change: We want to save events instead of values | |
# buffer_add(cast(T_co, value)) | |
event = self.current_event | |
# code change: We want to save events instead of values | |
buffer_add(cast(T_co, event)) | |
if event is None: | |
raise RuntimeError( | |
'Take buffer found current_event is None') | |
event_add(event) | |
if buffer_size() >= max_: | |
# signal that the buffer is full and should be emptied. | |
buffer_full.set() | |
# strict wait for buffer to be consumed after buffer | |
# full. | |
# If max is 1000, we are not allowed to return 1001 | |
# values. | |
buffer_consumed.clear() | |
await self.wait(buffer_consumed) | |
except CancelledError: # pragma: no cover | |
raise | |
except Exception as exc: | |
self.log.exception('Error adding to take buffer: %r', exc) | |
await self.crash(exc) | |
return value | |
# Disable acks to ensure this method acks manually | |
# events only after they are consumed by the user | |
self.enable_acks = False | |
self.add_processor(add_to_buffer) | |
self._enable_passive(cast(ChannelT, channel_it)) | |
try: | |
while not self.should_stop: | |
# wait until buffer full, or timeout | |
await self.wait_for_stopped(buffer_full, timeout=timeout) | |
if buffer: | |
# make sure background thread does not add new items to | |
# buffer while we read. | |
buffer_consuming = self.loop.create_future() | |
try: | |
yield list(buffer) | |
finally: | |
buffer.clear() | |
# code change: We want to manually ack | |
# for event in events: | |
# await self.ack(event) | |
events.clear() | |
# allow writing to buffer again | |
notify(buffer_consuming) | |
buffer_full.clear() | |
buffer_consumed.set() | |
else: # pragma: no cover | |
pass | |
else: # pragma: no cover | |
pass | |
finally: | |
# Restore last behaviour of "enable_acks" | |
self.enable_acks = stream_enable_acks | |
self._processors.remove(add_to_buffer) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from typing import NamedTuple | |
import faust | |
from faust.utils.tracing import set_current_span | |
from mode.utils.mocks import Mock | |
import pytest | |
from noack_take import noack_take | |
# mainly copied from faust tests | |
# faust/t/functional/test_streams.py | |
# changes from the faust tests are marked with a "code change" comment | |
@pytest.mark.asyncio | |
async def test_take(app): | |
async with new_stream(app) as s: | |
# code change: change assert to False | |
assert s.enable_acks is False | |
await s.channel.send(value=1) | |
event = None | |
# code change: using noack_take instead of take | |
# code change: noack_take returns event instead of value | |
async for noack_value in s.noack_take(s, 1, within=1): | |
assert noack_value[0].value == 1 | |
assert s.enable_acks is False | |
event = mock_stream_event_ack(s) | |
break | |
assert event | |
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0 | |
# need two sleeps on Python 3.6.7 + 3.7.1 :-/ | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
if not event.ack.called: | |
# code change: assert to not acked | |
assert not event.message.acked | |
assert not event.message.refcount | |
# code change: change assert to False | |
assert s.enable_acks is False | |
@pytest.mark.asyncio | |
async def test_take__10(app, loop): | |
s = new_stream(app) | |
async with s: | |
# code change: change assert to False | |
assert s.enable_acks is False | |
for i in range(9): | |
await s.channel.send(value=i) | |
async def in_one_second_finalize(): | |
await s.sleep(1.0) | |
await s.channel.send(value=9) | |
for i in range(10): | |
await s.channel.send(value=i + 10) | |
asyncio.ensure_future(in_one_second_finalize()) | |
event = None | |
# code change: using noack_take instead of take | |
buffer_processor = s.noack_take(s, 10, within=10.0) | |
async for noack_value in buffer_processor: | |
assert [nv.value for nv in noack_value] == list(range(10)) | |
assert s.enable_acks is False | |
event = mock_stream_event_ack(s) | |
break | |
async for noack_value in buffer_processor: | |
assert [nv.value for nv in noack_value] == list(range(10, 20)) | |
assert s.enable_acks is False | |
break | |
try: | |
await buffer_processor.athrow(asyncio.CancelledError()) | |
except asyncio.CancelledError: | |
pass | |
assert event | |
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0 | |
# need two sleeps on Python 3.6.7 + 3.7.1 :-/ | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
if not event.ack.called: | |
# code change: assert to not acked | |
assert not event.message.acked | |
assert not event.message.refcount | |
# code change: change assert to False | |
assert s.enable_acks is False | |
# code change: my own test to manually acknowledge | |
@pytest.mark.asyncio | |
async def test_take__10_and_ack(app, loop): | |
s = new_stream(app) | |
async with s: | |
# code change: change assert to False | |
assert s.enable_acks is False | |
for i in range(9): | |
await s.channel.send(value=i) | |
async def in_one_second_finalize(): | |
await s.sleep(1.0) | |
await s.channel.send(value=9) | |
for i in range(10): | |
await s.channel.send(value=i + 10) | |
asyncio.ensure_future(in_one_second_finalize()) | |
event = None | |
# code change: using noack_take instead of take | |
buffer_processor = s.noack_take(s, 10, within=10.0) | |
async for noack_values in buffer_processor: | |
assert [nv.value for nv in noack_values] == list(range(10)) | |
assert s.enable_acks is False | |
event = mock_stream_event_ack(s) | |
# code change: acknowledge | |
for noack_event in noack_values: | |
await s.ack(noack_event) | |
break | |
async for noack_values in buffer_processor: | |
assert [nv.value for nv in noack_values] == list(range(10, 20)) | |
assert s.enable_acks is False | |
# code change: acknowledge | |
for noack_event in noack_values: | |
await s.ack(noack_event) | |
break | |
try: | |
await buffer_processor.athrow(asyncio.CancelledError()) | |
except asyncio.CancelledError: | |
pass | |
assert event | |
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0 | |
# need two sleeps on Python 3.6.7 + 3.7.1 :-/ | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
if not event.ack.called: | |
assert event.message.acked | |
assert not event.message.refcount | |
# code change: change assert to False | |
assert s.enable_acks is False | |
@pytest.mark.asyncio | |
async def test_take__no_event_crashes(app, loop): | |
class NoCurrentEventStream(faust.Stream): | |
@property | |
def current_event(self): | |
return None | |
@current_event.setter | |
def current_event(self, event): | |
pass | |
app.conf.Stream = NoCurrentEventStream | |
s = new_stream(app) | |
assert isinstance(s, NoCurrentEventStream) | |
async with s: | |
# code change: change assert to False | |
assert s.enable_acks is False | |
await s.channel.send(value=1) | |
# code change: using noack_take instead of takevalue=1) | |
buffer_processor = s.noack_take(s, 10, within=10.0) | |
print('STARTING STREAM ITERATION') | |
async for value in buffer_processor: | |
print(f'RECEIVED VALUE: {value!r}') | |
break | |
print('ENDING STREAM ITERATION') | |
try: | |
await buffer_processor.athrow(asyncio.CancelledError()) | |
except asyncio.CancelledError: | |
pass | |
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0 | |
# need two sleeps on Python 3.6.7 + 3.7.1 :-/ | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
await asyncio.sleep(0) # needed for some reason | |
assert isinstance(s._crash_reason, RuntimeError) | |
print('RETURNING') | |
# code change: change assert to False | |
assert s.enable_acks is False | |
def mock_stream_event_ack(stream, return_value=False): | |
return mock_event_ack(stream.current_event, return_value=return_value) | |
def mock_event_ack(event, return_value=False): | |
event.ack = Mock(name='ack') | |
event.ack.return_value = return_value | |
return event | |
def new_stream(app, *args, **kwargs): | |
app = _prepare_app(app) | |
# code change: Changed stream to noack stream and added noack_take | |
ack_stream = _new_stream(app, app.channel(loop=app.loop, maxsize=1000), **kwargs) | |
noack_stream = ack_stream.noack() | |
setattr(noack_stream, "noack_take", noack_take) | |
return noack_stream | |
def new_topic_stream(app, *args, name: str = 'test', **kwargs): | |
app = _prepare_app(app) | |
return _new_stream(app, app.topic(name, loop=app.loop), **kwargs) | |
def _new_stream(app, channel, *args, **kwargs): | |
return channel.stream(*args, loop=app.loop, **kwargs) | |
# faust/t/functional/conftest.py | |
class AppMarks(NamedTuple): | |
name: str = 'funtest' | |
store: str = 'memory://' | |
cache: str = 'memory://' | |
def create_appmarks(name='funtest', | |
store='memory://', | |
cache='memory://', | |
**rest): | |
options = AppMarks( | |
name=name, | |
store=store, | |
cache=cache, | |
) | |
return options, rest | |
@pytest.yield_fixture() | |
def app(event_loop, request): | |
os.environ.pop('F_DATADIR', None) | |
os.environ.pop('FAUST_DATADIR', None) | |
os.environ.pop('F_WORKDIR', None) | |
os.environ.pop('FAUST_WORKDIR', None) | |
marks = request.node.get_closest_marker('app') | |
options, rest = create_appmarks( | |
**((marks.kwargs or {}) if marks else {})) | |
app = faust.App( | |
options.name, | |
store=options.store, | |
cache=options.cache, | |
**rest, | |
) | |
app.finalize() | |
set_current_span(None) | |
try: | |
yield app | |
finally: | |
assert app.tracer is None | |
def _prepare_app(app): | |
loop = asyncio.get_event_loop() | |
app.loop = loop | |
app.flow_control.resume() # <-- flow control initially suspended | |
return app | |
# faust/t/conftest.py | |
@pytest.fixture() | |
def loop(event_loop): | |
return event_loop |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment