Created
December 30, 2022 10:30
-
-
Save leonoel/b4ec06fe2df30764217977918084fda9 to your computer and use it in GitHub Desktop.
Missionary implementation of kafka consumer with backpressure and manual offset commits
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:deps {org.clojure/clojure {:mvn/version "1.11.1"} | |
org.apache.kafka/kafka-clients {:mvn/version "3.3.1"} | |
missionary/missionary {:mvn/version "b.26"}}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns kafka-consumer | |
(:require [missionary.core :as m]) | |
(:import (org.apache.kafka.clients.consumer ConsumerConfig KafkaConsumer ConsumerRecords ConsumerRecord OffsetAndMetadata) | |
(org.apache.kafka.common TopicPartition) | |
(java.util Map Collection) | |
(java.time Duration))) | |
(defn init-consumer | |
"Return a new `KafkaConsumer` instance for configuration map `params`, subscribed to `topics`." | |
[params topic & topics] | |
(doto (KafkaConsumer. ^Map params) (.subscribe ^Collection (cons topic topics)))) | |
(defn poll-consumer | |
"Return a task polling records from `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout." | |
[consumer millis exec] | |
(m/via exec (.poll ^KafkaConsumer consumer (Duration/ofMillis (long millis))))) | |
(defn close-consumer | |
"Return a task closing `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout." | |
[consumer millis exec] | |
(m/via exec (.close ^KafkaConsumer consumer (Duration/ofMillis (long millis))))) | |
(defn commit-consumer | |
"Return a task committing `offsets` from `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout." | |
[consumer offsets millis exec] | |
(m/via exec (.commitSync ^KafkaConsumer consumer ^Map offsets (Duration/ofMillis (long millis))))) | |
(defn pause-consumer! | |
"Pause `TopicPartition` collection `topic-partitions` from `KafkaConsumer` instance `consumer`." | |
[consumer topic-partitions] | |
(.pause ^KafkaConsumer consumer ^Collection topic-partitions)) | |
(defn resume-consumer! | |
"Resume `TopicPartition` collection `topic-partitions` from `KafkaConsumer` instance `consumer`." | |
[consumer topic-partitions] | |
(.resume ^KafkaConsumer consumer ^Collection topic-partitions)) | |
(defn get-and-set! | |
"Atomically assign value `x` to atom `a` and return previously assigned value." | |
[a x] | |
(loop [] | |
(let [y @a] | |
(if (compare-and-set! a y x) | |
y (recur))))) | |
(defn commit-channel | |
"Return a new channel instance for record committing." | |
[] (atom {})) | |
(defn request-commit! | |
"Push `ConsumerRecord` `record` with optional string metadata `metadata` on commit channel `chan`." | |
([chan record] (request-commit! chan record "")) | |
([chan ^ConsumerRecord record ^String metadata] | |
(swap! chan assoc | |
(TopicPartition. (.topic record) (.partition record)) | |
(OffsetAndMetadata. (inc (.offset record)) metadata)))) | |
(defn poll-records | |
"Return a flow producing `ConsumerRecord`s by polling from `KafkaConsumer` instance `consumer`, pausing and resuming | |
each topic partition according to downstream backpressure, and committing records pushed on commit channel `chan`. | |
Blocking operations are performed on executor `exec`." | |
[chan ^KafkaConsumer consumer exec] | |
(m/ap | |
(let [resume (atom #{})] | |
(loop [] | |
(m/? (commit-consumer consumer (get-and-set! chan {}) 1000 exec)) | |
(let [^ConsumerRecords consumer-records (m/? (poll-consumer consumer 1000 exec)) | |
topic-partitions (.partitions consumer-records)] | |
(pause-consumer! consumer topic-partitions) | |
(m/amb= | |
(loop [topic-partitions (seq topic-partitions)] | |
(if-some [[^TopicPartition topic-partition & topic-partitions] topic-partitions] | |
(m/amb> (m/?> (m/seed (.records consumer-records topic-partition))) | |
(do (swap! resume conj topic-partition) | |
(recur topic-partitions))) | |
(m/amb>))) | |
(do (resume-consumer! consumer (get-and-set! resume #{})) | |
(recur)))))))) | |
(comment | |
(defn process-and-commit [process consumer exec] | |
(m/reduce {} nil | |
(m/ap (let [commits (commit-channel) | |
record (m/?> (poll-records commits consumer exec))] | |
(m/? (process record)) | |
(request-commit! commits record))))) | |
(defn process-record [record] | |
(m/sp | |
(println "Processing" record) | |
(m/? (m/sleep (rand-int 2000))) | |
(println "Completed" record))) | |
(defn process-topics [& args] | |
(m/sp | |
(let [consumer (apply init-consumer args)] | |
(try (m/? (process-and-commit process-record consumer m/blk)) | |
(finally (m/? (m/compel (close-consumer consumer 1000 m/blk)))))))) | |
(def config | |
{ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092" | |
ConsumerConfig/GROUP_ID_CONFIG "test-consumer-123" | |
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringDeserializer" | |
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringDeserializer" | |
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG "false" | |
ConsumerConfig/MAX_POLL_RECORDS_CONFIG "5" | |
ConsumerConfig/AUTO_OFFSET_RESET_CONFIG "earliest"}) | |
(def cancel | |
((process-topics config "quickstart-events") | |
(fn [_] (println "Done.")) | |
(fn [^Throwable e] (.printStackTrace e)))) | |
(cancel) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment