Last active
February 22, 2016 15:20
-
-
Save timperrett/609de46bb647c682ce84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* discovered njoin on this thread: https://github.com/functional-streams-for-scala/fs2/issues/251 - which also cites (adjusted to match example below): | |
If we increase maxQueued up to 100, and `handle` emits multiple `A` values, up to 100 will be prefetched | |
before we block on the consumer of results. (Not actually blocking by occupying a thread, but semantically blocking.) | |
If we increase maxOpen to 50, then we can have 50 `item` streams processing concurrently. | |
*/ | |
import scalaz.stream.{Sink,Process,sink,channel} | |
import scalaz.stream.nondeterminism.njoin | |
import scalaz.concurrent.Task | |
case class Foo(arb: Int) | |
val items: Process[Task,Foo] = | |
Process.emitAll(for(i <- 0 to 10000) yield Foo(i)) | |
val rnd = new scala.util.Random | |
val effects: Sink[Task, Foo] = | |
sink.lift(foo => Task.delay { | |
// add some delay so the numbers obviously process out of order when printed to the console. | |
// never do this in your real code!! | |
val start = 100 | |
val end = 400 | |
Thread.sleep(start + rnd.nextInt( (end - start) + 1 )) | |
println(s"processing $foo") | |
}) | |
def handle(item: Foo): Process[Task,Unit] = | |
Process.emit(item).to(effects) | |
val handled: Process[Task,Process[Task,Unit]] = items.map(handle) | |
val results: Process[Task,Unit] = | |
njoin(maxOpen = 10, maxQueued = 1)(handled) | |
// print all the things to the console! | |
results.run.run | |
/* | |
You should then see the following output (or something like it, at least): | |
processing Foo(2) | |
processing Foo(1) | |
processing Foo(5) | |
processing Foo(3) | |
processing Foo(4) | |
processing Foo(0) | |
processing Foo(9) | |
processing Foo(6) | |
processing Foo(7) | |
processing Foo(10) | |
processing Foo(8) | |
.... | |
*/ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment