Skip to content

Instantly share code, notes, and snippets.

@abrekken
Last active April 19, 2020 19:09
Show Gist options
  • Save abrekken/4f6f7d57aaeefad4f8a8c703405af341 to your computer and use it in GitHub Desktop.
Save abrekken/4f6f7d57aaeefad4f8a8c703405af341 to your computer and use it in GitHub Desktop.
@Bean
public KStream<?, ?> kstreamTombstone(StreamsBuilder streamsBuilder) {
KStream<String, IotEvent> inbound = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
inbound
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value))
//check for a null value (tombstone) and set delete flag if necessary
.mapValues((readOnlyKey, value) -> {
if (value == null) {
value = new IotEvent();
value.setDelete(true);
}
return value;
})
//re-partition our data on a new key
.groupBy((key, value) -> eventKey(value), Grouped.with(Serdes.String(), iotEventSerde()))
//check for delete flag and return null if true
.aggregate(IotEventMetric::new,
(key, value, aggregate) -> {
if (value.isDelete()) {
return null;
}
return aggregate.add(value);
},
Materialized.<String, IotEventMetric, KeyValueStore<Bytes, byte[]>>as("eventstore")
.withValueSerde(iotEventMetricSerde())
)
.toStream()
//publish our results to a topic
.to("output-topic", Produced.with(Serdes.String(), iotEventMetricSerde()));
return inbound;
}
@suikast42
Copy link

I am hanging on the basic with working with KStrean and grouping.
Can you provide the implementation of iotEventSerde() and eventKey(value) ?

@abrekken
Copy link
Author

@suikast42 Hi there, the eventKey(value) method could return any string - it's just an example of building a new key to aggregate on. And the iotEventSerde() method would just return an Avro Serde - or whatever Serde you need for the data you're working with. Does that make sense?

@suikast42
Copy link

I found it out. Thanks a lot :-D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment