When creating streams from an asynchronous process, the idiomatic thing is to create a stream from that process at the earliest possible stage, rather than using a queue to invert control after the fact. See the creating streams examples - generally, you just use the Process.eval
and Process.repeatEval
functions to build a stream by running some asynchronous task repeatedly.
That said, if you have some existing logic that you need to bind to that's already based on callbacks and side effects, you can use the functions in scalaz.stream.async
. Here's an example, using a queue to invert control:
import scalaz.stream.async
val (q, src) = async.queue[Int]
// Thread 1