Skip to content

Instantly share code, notes, and snippets.

@abrekken
Last active July 25, 2019 02:45
Show Gist options
  • Save abrekken/79f29502e218c4ff26dba778b34d0e49 to your computer and use it in GitHub Desktop.
Save abrekken/79f29502e218c4ff26dba778b34d0e49 to your computer and use it in GitHub Desktop.
@Bean
public KStream<?, ?> kstreamAggregation(StreamsBuilder streamsBuilder) {
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
stream
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//re-partition our data on a new key
.groupBy((key, value) -> eventKey(value), Grouped.with(Serdes.String(), iotEventSerde()))
// add our incoming newValue into the aggregate
.aggregate(IotEventMetric::new,
(key, newValue, aggregate) -> aggregate.add(newValue),
Materialized.<String, IotEventMetric, KeyValueStore<Bytes, byte[]>>as("eventstore")
.withValueSerde(iotEventMetricSerde())
)
.toStream()
//publish our results to a topic
.to("output-topic", Produced.with(Serdes.String(), iotEventMetricSerde()));
return stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment