I've been using this code "in production" for a while now. It dates back to https://groups.google.com/g/python-tulip/c/J7tCcBU5TPA/m/NM7iBhhhEAAJ except that I converted it to Trio a while ago. It is intended to be lossless - if desired, you can ensure to catch all messages since you start to listen, without losing anything. Producers are blocked on send()
until the slowest consumer has received the message.
Since new consumers won't receive messages from before they began to listen, the point at which a consumer "begins listening" is important. This happens when the async iterator is created - ie. when the for loop runs the implicit aiter()
. If you do this as the first thing in a coroutine, you might expect all message following a nursery.start_soon()
call starting that coroutine to be picked up. But in practice, the for loop won't run the implicit aiter()
until some time later, and so you won't see messages sent prior to that point. To avoid this, you must all aiter()
yourself and pass that in, o