Skip to content

Instantly share code, notes, and snippets.

@abrekken
Last active June 3, 2020 17:18
Show Gist options
  • Save abrekken/e39aa191d052ad883b985c738c6bccf9 to your computer and use it in GitHub Desktop.
Save abrekken/e39aa191d052ad883b985c738c6bccf9 to your computer and use it in GitHub Desktop.
@Bean
public KStream<?, ?> kstreamProcessorApi(StreamsBuilder streamsBuilder) {
StoreBuilder<KeyValueStore<String, IotEvent>> eventStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("eventstore"),
Serdes.String(), iotEventSerde());
// register store
streamsBuilder.addStateStore(eventStoreBuilder);
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde()));
stream
.process(() -> new Processor<String, IotEvent>() {
private KeyValueStore<String, IotEvent> stateStore;
@Override
public void init(ProcessorContext context) {
stateStore = (KeyValueStore<String, IotEvent>) context.getStateStore("eventstore");
context.schedule(Duration.ofMinutes(10), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
//iterate over all records and remove them based on some criteria
stateStore.all().forEachRemaining(keyValue -> {
if (keyValue.value.getEventType() == EventType.PROCESSED) {
stateStore.delete(keyValue.key);
}
});
});
}
// additional logic omitted
;
return stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment