Skip to content

Instantly share code, notes, and snippets.

@Bean
public KStream<?, ?> kstreamTombstone(StreamsBuilder streamsBuilder) {
KStream<String, IotEvent> inbound = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
inbound
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//check for a null value (tombstone) and set delete flag if necessary
.mapValues((readOnlyKey, value) -> {
if (value == null) {
value = new IotEvent();
value.setDelete(true);
@Bean
public KStream<?, ?> kstreamProcessorApi(StreamsBuilder streamsBuilder) {
StoreBuilder<KeyValueStore<String, IotEvent>> eventStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("eventstore"),
Serdes.String(), iotEventSerde());
// register store
streamsBuilder.addStateStore(eventStoreBuilder);
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
stream
.process(() -> new Processor<String, IotEvent>() {
@Bean
public KStream<?, ?> windowedAggregation(StreamsBuilder streamsBuilder) {
WindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(new StringSerializer());
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer());
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), new JsonSerde<>(IotEvent.class)));
stream
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//re-partition our data on a new key
@Bean
public KStream<?, ?> kstreamAggregation(StreamsBuilder streamsBuilder) {
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
stream
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//re-partition our data on a new key
.groupBy((key, value) -> eventKey(value), Grouped.with(Serdes.String(), iotEventSerde()))
// add our incoming newValue into the aggregate
.aggregate(IotEventMetric::new,
(key, newValue, aggregate) -> aggregate.add(newValue),