Skip to content

Instantly share code, notes, and snippets.

@lnostdal
Last active January 10, 2019 02:29
Show Gist options
  • Save lnostdal/0b0801fbd04f13c4f3b3f43132e2256a to your computer and use it in GitHub Desktop.
Save lnostdal/0b0801fbd04f13c4f3b3f43132e2256a to your computer and use it in GitHub Desktop.
clojure.core.async: consume channel buffer without blocking
;; www.Quanto.ga
;;
;; I've found this to be useful at times:
(defn async-consume-buffer
"Consume as many values from `ch` as possible without blocking.
Once `ch` blocks (i.e. its buffer is empty), the values are returned as a vector."
([ch]
(async-consume-buffer ch false))
([ch first-request-blocking?]
(loop [e (if first-request-blocking?
(async/<!! ch)
(async/poll! ch))
v (transient [])]
(if e
(recur (async/poll! ch) (conj! v e))
(persistent! v)))))
quantataraxia.core> (let [ch (async/chan 10)]
(async/onto-chan ch [1 2 3] false)
(Thread/sleep 1000)
(async-consume-buffer ch))
[1 2 3]
quantataraxia.core> (let [ch (async/chan 10)]
(async/onto-chan ch [1 2 3] false)
;;(Thread/sleep 1000)
(async-consume-buffer ch))
[]
quantataraxia.core> (let [ch (async/chan 10)]
(async/onto-chan ch [1 2 3] false)
;;(Thread/sleep 1000)
(async-consume-buffer ch true))
[1 2 3]
quantataraxia.core>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment