Skip to content

Instantly share code, notes, and snippets.

@KingCode
Last active October 1, 2018 13:58
Show Gist options
  • Save KingCode/8bc23748acb0b0421a2b2f96a15d207a to your computer and use it in GitHub Desktop.
Save KingCode/8bc23748acb0b0421a2b2f96a15d207a to your computer and use it in GitHub Desktop.
An implemementation of the transducer API from scratch, as a learning aid.
(ns scratch.transducer
(:refer-clojure :exclude [comp map filter mapcat take drop
take-while drop-while
dedupe
distinct
reductions
;; sequence
transduce]))
;; STEP FUNCTION:
;; The traditional 2-arity function passed to clojure.core/reduce,
;; which takes the accumulator and the element being processed, and yields
;; the accumulator for the next step. When all items have been processed,
;; the accumulator is the result. Any 2-arity fn, including
;; all arithmetic fns, conj, cons, concat etc., can be a step function in a reducing
;; context, e.g. (reduce #(/ % %2) [1000 5 2 4 5 5]) ;; => 1
;; REDUCING FUNCTION:
;; A step function with 1- and 0-arity versions added. The 0-arity provides
;; an initial value, and the 1-arity yields the result (after all elements
;; have been processed).
;; The reducing function is the processor fn for the pipeline, and is usually
;; the result of nesting possibly many reducing functions, stitched together by
;; the transducer middleware.
;; Examples: +, *, conj (but not e.g. / or cons, unless adding custom arities)
;; TRANSDUCER:
;; A transducer is a function taking a reducing function (RF, or rf) F and returning
;; another reducing function G - in other words, transducers are middleware
;; for reducing functions.
;;
;; When invoked, G invokes F after doing its own work - usually invoking
;; the next rf in the pipeline, which could also be the last, e.g. conj
;; for a sequential output coll.
;;
;; Transducers are thus composable and the glue of the processing pipeline.
;; They enable decoupling of processing from both input data and output format.
;; Each element goes through the entire pipeline operation at once (an added
;; performance bonus).
;;
;; Transducers are created by transducer generating functions (factories)
;; such as #map, #filter et all.
;;
;; In order to be API compliant a transducer must return a full, 3-arities
;; reducing function, or exceptions will occur when the transducing context
;; tries to invoke the missing arities.
;;
;; The transducer factory fn usually takes a 1-arity function argument,
;; which is invoked on the current element within the step function.
;;
;; TRANSDUCING CONTEXT:
;; A transducing context puts together a transducer and input
;; and output methods (streams, collections, channels etc.)
;; Examples are transduce, sequence, 3-arity into, eduction.
(defn comp [f & fs]
(if (not fs)
(fn [& xs]
(apply f xs))
(fn [ & xs]
(f (apply (apply comp fs) xs)))))
(defn reductions
([f init coll]
(seq
(reduce (fn [accs x]
(conj accs
(f (last accs) x)))
[init]
coll)))
([f coll]
(reductions f (first coll) (rest coll))))
(defn rf-addenda [rf-sym]
`[([~'acc] (~rf-sym ~'acc))
([] (~rf-sym))])
;; (as-rf ([acc x] (rf acc (f x))) rf)
;; => (fn ([acc x] ...) ([acc] (my-rf acc)) ([] (my-rf))
(defmacro as-rf [step-fn-args+body rf-sym]
`(fn
~step-fn-args+body
;; ~@(rf-addenda rf-sym)
([~'acc] (~rf-sym ~'acc))
([] (~rf-sym))))
;;
;; Functions yielding transducers
;;
(defn mapping [f]
(fn [rf]
(as-rf ([acc x] (rf acc (f x))) rf)))
(defn filtering [pred]
(fn [rf]
(as-rf
([acc x]
(if (pred x)
(rf acc x)
acc)) rf)))
(defn removing [pred]
(filtering (complement pred)))
(defn mapcatting [f]
(fn [rf]
(as-rf
([acc x]
(reduce rf acc (f x)))
rf)))
;; utility for count-down state
(defn done-pred [steps]
(let [ctr (atom (dec steps))]
(fn []
(let [n @ctr]
(if (neg? n)
true
(do (swap! ctr dec)
false))))))
;; utility for predicate based single on to off, state.
(defn on->off-pred [pred]
(let [on? (atom true)]
(fn [v] (and @on?
(let [onn? (pred v)]
(when-not onn?
(reset! on? false))
onn?)))))
;;
;; Functions yielding stateful transducers
;;
(defn taking [n]
(fn [rf]
(let [done? (done-pred n)]
(as-rf
([acc x]
(if (done?)
(reduced acc)
(rf acc x)))
rf))))
(defn taking-while [pred]
(fn [rf]
(let [on? (on->off-pred pred)]
(as-rf
([acc x]
(if (on? x)
(rf acc x)
(reduced acc)))
rf))))
(defn dropping [n]
(fn [rf]
(let [started? (done-pred n)]
(as-rf
([acc x]
(if (started?)
(rf acc x)
acc))
rf))))
(defn dropping-while [pred]
(fn [rf]
(let [dropping? (on->off-pred pred)]
(as-rf
([acc x]
(if (dropping? x)
acc
(rf acc x)))
rf))))
(defn distinct []
(fn [rf]
(let [xs (atom #{})]
(as-rf
([acc x]
(if-not (@xs x)
(do (swap! xs conj x)
(rf acc x))
acc))
rf))))
(defn dedupe []
(fn [rf]
(let [last (atom nil)]
(as-rf
([acc x]
(if-not (= @last x)
(do (reset! last x)
(rf acc x))
acc))
rf))))
;; Function which puts together transducers, input and output
;; xf is the transducer used (or composition of),
;; f is a reducing function aggregating the output
;; if provided (default is invoking 0-arity f), init is the initial output value
;;
(defn transduce
([xf f init coll]
(reduce (xf f) init coll))
([xf f coll]
(transduce xf f (f) coll)))
(defn map-reduced [f coll]
(if (empty? coll)
()
(let [[x & xs] coll
v (f x)]
(cons v
(if (reduced? v)
()
(lazy-seq (map-reduced f xs)))))))
#_(defn append [f [x & xs]]
(if-not x
()
(let [v (f x)]
(cons v
(lazy-seq (append f xs))))))
;;;;;; utilities and wrappers ;;;;;
(defn append
([coll x]
(concat coll [x]))
([coll]
coll)
([]()))
(defn map [f coll]
(transduce (mapping f) append coll))
(defn filter [f coll]
(transduce (filtering f) append coll))
(defn mapcat [f coll]
(transduce (mapcatting f) append coll))
(defn take [n coll]
(transduce (taking n) append coll))
(defn drop [n coll]
(transduce (dropping n) append coll))
(defn take-while [pred coll]
(transduce (taking-while pred) append coll))
(defn drop-while [pred coll]
(transduce (dropping-while pred) append coll))
(def pipe-ops (comp (dropping 1000000)
(taking 50)
(filtering odd?)
(mapping #(rem % 1000000))
(taking-while #(< % 10))
(dropping-while #(< % 5))
(mapcatting range)))
(defn big-pipe []
(transduce
pipe-ops
append
(range)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment