Skip to content

Instantly share code, notes, and snippets.

@fr33m0nk
Created October 17, 2022 13:47
Show Gist options
  • Save fr33m0nk/200705b84d38f50ee4373b1652c2fedc to your computer and use it in GitHub Desktop.
Save fr33m0nk/200705b84d38f50ee4373b1652c2fedc to your computer and use it in GitHub Desktop.
Functions for creating sequential and parallel streams. Parallel streams are very useful when working with Datomic's `d/datom` api
(ns fr33m0nk.clj-java-streams.core
(:refer-clojure :exclude [filter map distinct take drop sort take-while drop-while concat])
(:import
(java.util Collection Comparator)
(java.util.function BinaryOperator Consumer Function Predicate)
(java.util.stream BaseStream Collectors Stream StreamSupport)))
(defn iterable?
[coll]
(let [class-of-coll (class coll)]
(isa? class-of-coll Iterable)))
(defn implements-collection?
[coll]
(let [class-of-coll (class coll)]
(isa? class-of-coll Collection)))
(defn stream-of
^Stream
[coll]
(Stream/of (to-array coll)))
(defn coll->stream
"Converts collection implementing java.lang.Iterable to parallel Java stream"
^Stream
[^Iterable coll]
(assert (iterable? coll) "Incorrect Collection type. Collection should implement java.lang.Iterable")
(if (implements-collection? coll)
(-> ^Collection coll (.stream))
(-> ^Iterable coll (.spliterator) (StreamSupport/stream false))))
(defn coll->parallel-stream
"Converts collection implementing java.lang.Iterable to parallel Java stream"
^Stream
[^Iterable coll]
(assert (iterable? coll) "Incorrect Collection type. Collection should implement java.lang.Iterable")
(if (implements-collection? coll)
(-> ^Collection coll (.parallelStream))
(-> ^Iterable coll (.spliterator) (StreamSupport/stream true))))
(defn filter
^Stream
[^Predicate pred ^Stream stream]
(.filter stream pred))
(defn map
^Stream
[^Function mapper ^Stream stream]
(.map stream mapper))
(defn flat-map
^Stream
[^Function mapper ^Stream stream]
(.flatMap stream mapper))
(defn distinct
^Stream
[^Stream stream]
(.distinct stream))
(defn sort
^Stream
([^Stream stream]
(.sorted stream))
([^Comparator comparator ^Stream stream]
(.sorted stream comparator)))
(defn take
^Stream
[^Long size ^Stream stream]
(.limit stream size))
(defn drop
^Stream
[^Long size ^Stream stream]
(.skip stream size))
(defn take-while
^Stream
[^Predicate pred ^Stream stream]
(.takeWhile stream pred))
(defn drop-while
^BaseStream
[^Predicate pred ^BaseStream stream]
(.dropWhile stream pred))
(defn concat
^Stream
[^Stream stream-a ^Stream stream-b]
(Stream/concat stream-a stream-b))
(defn to-vector
[^Stream stream]
(-> stream
(.collect (Collectors/toList))
vec))
(defn to-set
[^Stream stream]
(-> stream
(.collect (Collectors/toSet))
set))
(defn to-map
([^Function key-mapper ^Function value-mapper ^Stream stream]
(-> stream
(.collect (Collectors/toMap key-mapper value-mapper))
((partial into {}))))
([^Function key-mapper ^Function value-mapper ^BinaryOperator mergeFunction ^Stream stream]
(-> stream
(.collect (Collectors/toMap key-mapper value-mapper mergeFunction))
((partial into {})))))
(defn for-each
[^Consumer action ^Stream stream]
(.forEach stream action))
@fr33m0nk
Copy link
Author

Using in conjunction with power-dot or cljj or clambda eliminates most of above boilerplate.

@fr33m0nk
Copy link
Author

(require '[power-dot.core :as dot])

(dot/.. (coll->parallel-stream (d/datoms db :avet <<attribute>>))
                                (map (dot/as-fn :e))
                                (map <<mapping function>>>)
                                (filter some?)
                                (collect (Collectors/toList)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment