Last active
July 22, 2022 15:40
-
-
Save graingert/cb40f8000f3ef52b64dd6b4df5648e68 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import contextlib | |
class _AcknowledgeException: | |
def __init__(self, exception): | |
self.exception = exception | |
class _RecommendGeneratorExit(BaseException): | |
pass | |
async def agen_fn(): | |
try: | |
try: | |
while True: | |
try: | |
await asyncio.Event().wait() | |
except asyncio.CancelledError as e: | |
yield _AcknowledgeException(e) | |
except _RecommendGeneratorExit as genexit: | |
try: | |
await asyncio.Event().wait() | |
except asyncio.CancelledError as e: | |
yield _AcknowledgeException(e) | |
yield _AcknowledgeException(genexit) | |
except _RecommendGeneratorExit as genexit2: | |
try: | |
await asyncio.Event().wait() | |
except asyncio.CancelledError as e: | |
yield _AcknowledgeException(e) | |
yield _AcknowledgeException(genexit2) | |
class CoroutineWrapper: | |
def __init__(self, coro): | |
self._coro = coro | |
def __await__(self): | |
return self | |
def __next__(self): | |
return self.send(None) | |
def send(self, v): | |
try: | |
return self._coro.send(v) | |
except StopIteration as e: | |
if type(e.value) is _AcknowledgeException: | |
raise e.value.exception | |
raise | |
def throw(self, *args, **kwargs): | |
try: | |
return self._coro.throw(*args, **kwargs) | |
except StopIteration as e: | |
if type(e.value) is _AcknowledgeException: | |
raise e.value.exception | |
raise | |
class AsyncGenWrapper: | |
def __init__(self, agen): | |
self._agen = agen | |
async def __anext__(self): | |
return await self.asend(None) | |
async def asend(self, v): | |
return await CoroutineWrapper(self._agen.asend(v)) | |
async def athrow(self, *args, **kwargs): | |
return await CoroutineWrapper(self._agen.athrow(*args, **kwargs)) | |
async def aclose(self): | |
try: | |
return await CoroutineWrapper(self._agen.athrow(_RecommendGeneratorExit)) | |
except _RecommendGeneratorExit: | |
pass | |
def __aiter__(self): | |
return self | |
async def amain(): | |
try: | |
async with asyncio.timeout(None) as timeout: | |
async with contextlib.aclosing(AsyncGenWrapper(agen_fn())) as agen: | |
try: | |
async with asyncio.timeout(0): | |
async for v in agen: | |
print(v) | |
except TimeoutError: | |
print("timed out") | |
else: | |
raise RuntimeError | |
try: | |
async with asyncio.timeout(0): | |
async for v in agen: | |
print(v) | |
except TimeoutError: | |
print("timed out") | |
else: | |
raise RuntimeError | |
timeout.reschedule(-1) | |
finally: | |
async with asyncio.timeout(1): | |
await agen.aclose() | |
asyncio.run(amain()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment