Skip to content

Instantly share code, notes, and snippets.

@tmoerman
Last active January 2, 2018 00:36
Show Gist options
  • Save tmoerman/903a49b7a4a0ed9155a03c72831a5263 to your computer and use it in GitHub Desktop.
Save tmoerman/903a49b7a4a0ed9155a03c72831a5263 to your computer and use it in GitHub Desktop.
core.async combine-latest
(defn combine-latest
"Accepts a collection of channels, an optional selector function f and an option output
channel. Returns a channel with the latest values of the input values combined by the
selector function. If no selector function is specified, a vector will be returned.
The output channel closes when any of the input channels closes.
Inspired by http://rxmarbles.com/#combineLatest"
([chs] (combine-latest (chan) vector chs))
([f chs] (combine-latest (chan) f chs))
([out f chs]
(assert some? chs)
(let [ch->idx (->> chs
(map-indexed (fn [i x] [x i]))
(into {}))
nil-vec (-> (count chs) (repeat nil) (vec))]
(go-loop [state nil-vec]
(let [[v ch] (a/alts! chs)]
(if (some? v)
(let [idx (ch->idx ch)
new-state (assoc state idx v)]
(when (every? some? new-state) ; emit when a value is present for every index
(>! out (apply f new-state)))
(recur new-state))
(close! out)))) ; close out when one of the input channels is closed
out)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment