Skip to content

Instantly share code, notes, and snippets.

@rocketpages
Created March 3, 2016 19:07
Show Gist options
  • Save rocketpages/9ee1523db698edc4cfb5 to your computer and use it in GitHub Desktop.
Save rocketpages/9ee1523db698edc4cfb5 to your computer and use it in GitHub Desktop.
// @formatter:off
val g = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
// Source
val A: Outlet[String] = builder.add(Source.fromIterator(() => flightDelayLines)).out
// Flows
val B: FlowShape[String, FlightEvent] = builder.add(csvToFlightEvent)
val C: FlowShape[FlightEvent, FlightDelayRecord] = builder.add(filterAndConvert)
val D: UniformFanOutShape[FlightDelayRecord, FlightDelayRecord] = builder.add(Broadcast[FlightDelayRecord](2))
val F: FlowShape[FlightDelayRecord, (String, Int, Int)] = builder.add(averageCarrierDelay)
// Sinks
val E: Inlet[Any] = builder.add(Sink.ignore).in
val G: Inlet[Any] = builder.add(Sink.foreach(averageSink)).in
// Graph
A ~> B ~> C ~> D
E <~ D
G <~ F <~ D
ClosedShape
})
// @formatter:on
g.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment