Example pipeline model: http://imgur.com/a/G7qEy
Each variable would duplicate this model. Only some variables need the bootstrap
function, otherwise it's a sequential pipeline.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Forecast generation | |
(defn inst->forecasts [inst] | |
(for [r (range 1 25)] | |
[(c/to-sql-time (t/plus (without-minutes inst) (t/hours r))) (rand-int 50)])) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Async | |
(defn process-fn [inst] | |
{:vals (inst->forecasts inst)}) | |
;; there is a launch function for each variable of interest. | |
;; in this case, this is the "forecast" launcher. There are other namespaces | |
;; with other launch functions, say for "variable 1" "variable 2", etc. Really, only | |
;; `schedule`, `process-fn`, and the persist function (`insert-forecasts` here) are unique to each job. | |
;; `in`, `out`, and `chime` are shown more clearly for their intent in the pipeline model image linked | |
;; in the `pipeline.md` file included in this gist. | |
(defn launch [] | |
(let [in (chan) | |
out (chan) | |
schedule (p/periodic-seq | |
(let [n (t/now)] | |
(t/date-time (t/year n) (t/month n) (t/day n) (t/hour n) 0)) | |
(t/minutes 1)) | |
chime (chime-ch schedule)] | |
((persist insert-forecasts db) out) | |
((process process-fn) in out) | |
(realtime chime in) | |
[in out chime])) |
Example pipeline model: http://imgur.com/a/G7qEy
Each variable would duplicate this model. Only some variables need the bootstrap
function, otherwise it's a sequential pipeline.
(ns ngin.utils | |
(:require [clojure.core.async :refer [<! >! go]])) | |
(defn realtime [ch in] | |
(println (str ">>> started `realtime` in " *ns*)) | |
(go (loop [] | |
(when-let [v (<! ch)] | |
(>! in v) | |
(recur))) | |
(println (str ">>> closing `realtime` in " *ns*)))) | |
(defn process [f] | |
(fn [in out] | |
(println (str ">>> started `process` in " *ns*)) | |
(go (loop [] | |
(when-let [inst (<! in)] | |
(>! out (f inst)) | |
(recur))) | |
(println (str ">>> closing `process` in " *ns*))))) | |
(defn persist [f db] | |
(fn [ch] | |
(println (str ">>> started `persist` in " *ns*)) | |
(go (loop [] | |
(when-let [v (<! ch)] | |
(f db v) | |
(recur))) | |
(println (str ">>> closing `persist` in " *ns*))))) |