When creating streams from an asynchronous process, the idiomatic thing is to create a stream from that process at the earliest possible stage, rather than using a queue to invert control after the fact. See the creating streams examples - generally, you just use the Process.eval
and Process.repeatEval
functions to build a stream by running some asynchronous task repeatedly.
That said, if you have some existing logic that you need to bind to that's already based on callbacks and side effects, you can use the functions in scalaz.stream.async
. Here's an example, using a queue to invert control:
import scalaz.stream.async
val (q, src) = async.queue[Int]
// Thread 1
q.enqueue(1) // ordinary side-effecting calls
q.enqueue(2)
...
q.close
// Thread 2
src: Process[Task,Int]
src.take(10).to(snk).run
So, async.queue
returns a mutable Queue
, which can be populated by the producer, and on the other side, the consumer gets back an ordinary stream. It would be preferable (but not always possible) to just modify Thread 1 to just create the stream directly, rather than having the side effects of dumping to a mutable queue.
Similarly, there's also scalaz.stream.async.signal
, which represents a (possibly) continuously updated value of some type. From a signal, you can obtain the discrete stream of when it changes, or the continous current value:
import scalaz.stream.async
val batchSize = async.signal[Int]
val now: Process[Task,Int] = batchSize.continuous
val onChange: Process[Task,Int] = batchSize.discrete
// Thread 1
batchSize.value.set(10)
..
batchSize.value.set(25)
// Thread 2
batchSize.discrete.take(10).map ...
You can also bind to an actor or any other callback-y API to get back a stream:
import scalaz.concurrent.Actor
case class M(cb: Throwable \/ MoarBytes => Unit)
val a = Actor.actor[M] { case M(cb) =>
...
val result = reallyExpensiveOp(r)
cb(result)
}
val t: Process[Task,MoarBytes] =
Process.repeatEval(Task.async { cb => a ! M(cb) })
t.filter(_.canHazBytes).map(foo).fold(..)