Skip to content

Instantly share code, notes, and snippets.

@sosuren
Created May 4, 2017 17:10
Show Gist options
  • Save sosuren/fd836d2af33fc25bd609acb6ca43d174 to your computer and use it in GitHub Desktop.
Save sosuren/fd836d2af33fc25bd609acb6ca43d174 to your computer and use it in GitHub Desktop.
broadcast to two sinks one being delayed
val g = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
val so = builder.add(Source.fromIterator(() => List(1, 2, 3).iterator)).out
val p1 = builder.add(Flow[Int].scan(0)((acc, next) => acc + next))
val p2 = builder.add(Flow[Int].delay(1.second))
val b = builder.add(Broadcast[Int](2))
val sa = builder.add(Sink.foreach[Int](item => {
println(s"-=> Sink A: $item")
})).in
val sb = builder.add(Sink.foreach[Int](item => {
println(s"-=> Sink B: $item")
})).in
so ~> p1 ~> b
sa <~ b
sb <~ p2 <~ b
ClosedShape
})
g.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment