Skip to content

Instantly share code, notes, and snippets.

@kapilreddy
Last active August 29, 2015 14:10
Show Gist options
  • Save kapilreddy/5a54ccfda00c5915e34a to your computer and use it in GitHub Desktop.
Save kapilreddy/5a54ccfda00c5915e34a to your computer and use it in GitHub Desktop.
Core.async message consumer with safe shutdown
(ns core-async-kafka.core
(:require [clojure.core.async :refer [chan buffer go >!! <!! <! close! sliding-buffer]]))
(defn process-message
[msg status-chan]
(>!! status-chan :in-progress)
(Thread/sleep (rand-int 10000))
(println (format "Consumed %s message" msg))
(>!! status-chan :sucess))
(defn consume-messages
[]
;; Create a channel with a blocking buffer of size 1. Puts will be
;; blocked if buffer is full
(let [messages-chan (chan (buffer 1))
status-chan (chan (sliding-buffer 1))]
(future (loop [n 0]
;; Emulate real traffic
(Thread/sleep (rand-int 1000))
;; Blocks if buffer is full
;; Returns nil if channel is closed
(when (>!! messages-chan n)
(println (format "Pushed %s message" n))
(recur (inc n)))))
(go (loop []
;; Blocks until message is present
(if-let [msg (<! messages-chan)]
(do
(process-message msg status-chan)
(recur))
(println "Finished consuming dying ..."))))
(fn []
;; Close channel
(loop []
(when (not (= (<!! status-chan)
:sucess))
(recur)))
(close! messages-chan)
(println "Finished consuming dying complete ..."))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment