Skip to content

Instantly share code, notes, and snippets.

@abrekken
Last active June 19, 2019 00:32
Show Gist options
  • Save abrekken/e9f0c5c25309520e1c0ee5ca41b9c4c6 to your computer and use it in GitHub Desktop.
Save abrekken/e9f0c5c25309520e1c0ee5ca41b9c4c6 to your computer and use it in GitHub Desktop.
@Bean
public KStream<?, ?> windowedAggregation(StreamsBuilder streamsBuilder) {
WindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(new StringSerializer());
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer());
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), new JsonSerde<>(IotEvent.class)));
stream
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//re-partition our data on a new key
.groupBy((key, value) -> eventKey(value), Grouped.with(Serdes.String(), new JsonSerde<>(IotEvent.class)))
//segment our messages into 10-minute windows
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
// aggregate and configure our state-store with a 30 day retention period
.aggregate(IotEventMetric::new,
(key, newValue, aggregate) -> aggregate.add(newValue),
Materialized.<String, IotEventMetric, WindowStore<Bytes, byte[]>>as("eventstore")
.withValueSerde(new JsonSerde<>(IotEventMetric.class))
.withRetention(Duration.ofDays(30))
)
.toStream()
.peek((key, value) -> LOG.debug("Windowed aggregation, key {} and value {}", key, value))
//publish our results to a topic
.to("output-topic", Produced.with(windowedSerde, new JsonSerde<>(IotEventMetric.class)));
return stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment