Skip to content

Instantly share code, notes, and snippets.

@stereobutter
stereobutter / example.py
Last active August 18, 2022 20:53
soft cancellation with trio
import trio
from .soft_cancel_scope import SoftCancelScope
async def main():
async def do_stuff(num):
print(f'starting job {num} at t={trio.current_time()-START_TIME}')
await trio.sleep(1)
print(f'finished job {num} at t={trio.current_time()-START_TIME}')
@stereobutter
stereobutter / cattr_field.py
Created November 21, 2021 11:00
`cattrs` + `pydantic`
import cattr
class Cattrs:
"""A pydantic compatible field class that uses `cattrs` to structure data """
def __hash__(self):
return hash((type(self), self._model))
def __eq__(self, other):
return hash(self) == hash(other)
@stereobutter
stereobutter / example.py
Created November 17, 2021 18:21
TrioLoop
import trio
from .trio_loop import TrioLoop
async def foo(data):
await trio.sleep(1)
return data
with TrioLoop() as loop:
data = loop.wait(foo('hello world'))
print(data)
@stereobutter
stereobutter / example.py
Last active June 23, 2021 15:01
graceful termination with trio
from .lifetime import Lifetime
from contextlib import contextmanager
@contextmanager
def merge_cancellation():
try:
yield
except trio.MultiError as wrapper:
errors = wrapper.exceptions