Skip to content

Instantly share code, notes, and snippets.

@Jayasagar
Created December 19, 2017 13:12
Show Gist options
  • Save Jayasagar/e56d6dad2e47f6572b4c8d7ca430f438 to your computer and use it in GitHub Desktop.
Save Jayasagar/e56d6dad2e47f6572b4c8d7ca430f438 to your computer and use it in GitHub Desktop.
Graph<ClosedShape, Consumer.Control> completionStageGraph = GraphDSL.create(source, (builder, sourceShape) -> {
// Add Broadcast
// Add Flows
// Add Sinks
builder
.from(...)
.viaFanOut(...)
.via(...)
.to(...);
builder
.from(...)
.via(...)
.to(...);
return ClosedShape.getInstance();
});
// Run the stream and supervise
RunnableGraph.fromGraph(completionStageGraph)
.run(materializer)
.isShutdown()
.whenComplete((done, throwable) -> {
log.info("The underlying `KafkaConsumer` has been closed.");
getSelf().tell(PoisonPill.getInstance(), getSelf());
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment