Here's a mechanism for transforming a discrete input flow into an output flow where the internal process state can be implemented with loop/recur. As an example I provide an implementation of "batching with both a maximum size and a maximum delay" referenced in leonoel/missionary#109.
A common pattern for discrete flows is that we want to create an output flow from an input flow, where:
- We process items from the input flow one at a time.
- In response to an input item, we may want to produce zero, one, or multiple output items.
- Often, we have some internal state to decide how to process the items; and so, in addition to producing zero or more output items, we may want to also update our internal state.
We have multiple options for an implementation.
We could write a transducer and then call m/eduction
. For example, if we wanted to remove
consecutive duplicates, we could write
https://github.com/clojure/clojure/blob/clojure-1.11.1/src/clj/clojure/core.clj#L7728,
and then call (m/eduction (dedupe) input-flow)
.
Using the same pattern of imperatively updating the internal state, we could also write
the transformer directly using m/ap
:
(defn mdedupe [f]
(let [!prior (volatile! ::none)]
(m/ap
(let [v (m/?> f)
prior @!prior]
(vreset! !prior v)
(if (= prior v)
(m/amb)
v)))))
When I have a process with internal state, I like to implement the state using loop/recur. I find that, for me, there's less cognative overhead in keeping track of what's happening when. For something as simple as "dedupe" it may not make a substantial difference, but for a more complex process it can help avoid exceeding my mental capacity in following the logic of the implementation.
If I had an rdv
I could use to pull values from the input flow, I could
use loop/recur for the state:
(defn mdedupe [f]
(transmute f
(fn [get]
(m/ap
(loop [prior ::none]
(if-let [[v] (m/? get)]
(m/amb
(if (not= v prior)
v
(m/amb))
(recur v))
(m/amb)))))))
I pass nil
to the rdv
to signal termination of the input flow, and [v]
to
deliver a value from the input flow. This serves as a "maybe" value, where
[nil]
means the input flow has produced a nil
value, and nil
means the
input flow has terminated.
if-let
is a convenient way of checking the "maybe" value received
from the rdv
. An if-let
with a destructuring binding first checks whether the
test value is truthy, and, if it is, it applies the destructuring to the
test value and proceeds to the "true" branch. But, if the test value is
falsey, the destructuring is skipped and the "false" branch is invoked.
With (if-let [[v] (m/? get)] ... ...)
, a [nil]
value
invokes the "true" branch with v
set to nil
, while a nil
value
invokes the "false" branch.
In Clojure, the false branch of an if
defaults to nil
if it hasn't been specified.
In an m/ap
, the zero or identity operation could be considered (m/amb)
, that is,
to produce no values. It's convenient to have an "if or none" macro:
(defmacro ifn [test then]
`(if ~test ~then (m/amb)))
Note that I deliberately leave out the option to include an "else" clause,
because if we have an "else" clause we can simply use an if
.
Similarly I can have an "if-let or none" macro:
(defmacro if-letn [bindings then]
`(if-let ~bindings ~then (m/amb)))
Now I can write my mdedupe
as:
(defn mdedupe [f]
(transmute f
(fn [get]
(m/ap
(loop [prior ::none]
(if-letn [[v] (m/? get)]
(m/amb
(ifn (not= v prior) v)
(recur v))))))))
As another example, an implementation of mtake
, equivalent to
(m/eduction (take n) f)
:
(defn mtake [n f]
(transmute f
(fn [get]
(m/ap
(loop [n n]
(ifn (pos? n)
(if-letn [[v] (m/? get)]
(m/amb v (recur (dec n))))))))))
What about the implementation of transmute
? We have a couple aspects to consider.
We need to consume values from the input flow and provide them to the transmuter
over the rdv
. We also want to cancel the input flow when the output flow
terminates, which doesn't happen automatically across an rdv
.
A convenient way to cancel a process is to use m/race
. If cancel
is an m/dfv
, we
can use that to trigger the m/race
to cancel the process:
(m/race cancel (task-to-consume-input-flow))
As a helper, let's write a task to consume the input flow.
(item v)
is run as a task for each value v
produced by
the input flow. terminated
is a run as a task when
the input flow terminates.
(defn on-each [f item terminated]
(m/reduce (fn [_ _]) nil
(m/ap
(m/amb
(let [v (m/?> f)]
(m/? (item v))
(m/amb))
(do
(m/? terminated)
(m/amb))))))
Now the implementation of "transmute" is perhaps a bit clunky, but straightforward:
(defn transmute [flow transmuter]
(let [in (m/rdv)
cancel (m/dfv)]
(m/ap
(m/amb=
(do
(m/?
(m/race
cancel
(on-each flow #(in [%]) (in nil))))
(m/amb))
(m/amb
(m/?> (transmuter in))
(do (cancel true) (m/amb)))))))
This has two parts, the two branches of the m/amb=
:
The first part delivers values from the input flow to the transmuter, and
cancels the input flow if cancel
is assigned. It's part of the m/ap
through the m/amb=
so that cancellation of the output flow propagates
to the task, but it doesn't itself produce any values to the output flow.
The second part receives and produces values from the transmuter, and assigns
cancel
when the transmuter terminates.
As another example, we can consider an alternative implementation for leonoel/missionary#109
The goal is to create a "batch" process, where there's both a maximum number of items to include in a batch and also a maximum latency where a smaller batch is produced if necessary to avoid holding on to any item for too long.
This is a discrete process, where every input value appears in the output flow in one batch or another.
Following the "maybe a bit clunky, but straightforward" approach, we have two kinds of events here: a value is produced from the input flow, and a value has reached its timeout where it needs to be produced in a batch if it hasn't already.
If we had a discrete flow of these events, the batch process itself is easy: we can process the events one at a time. As values come in we add them to a bucket. We emit a batch when:
- The bucket is now full after adding a value;
- We receive a timeout for a value which is in the bucket; or,
- The input flow terminates and the bucket isn't empty.
For convenience, we can enumerate the input values so we can easily tell which value a timeout is for.
a
b
c
d
... ⇒ [0 a]
[1 b]
[2 c]
[3 d]
...
(defn enumerate [f]
(m/eduction (map-indexed vector) f))
Now we want to mix in timeouts:
[0 a]
[1 b]
[2 c]
... ⇒ [0 :v a]
[1 :v b]
[0 :timeout]
[2 :v c]
...
Here's a buggy implementation:
(defn add-timeouts [ms f]
(m/ap
(let [[i v] (m/?> ##Inf f)]
(m/amb
[i :v v]
(do (m/? (m/sleep ms))
[i :timeout])))))
The problem is that if the input flow produced values synchronously, such as
by m/seed
(i.e. the values were produced "at the same time", without a delay
between them), the output values can appear in a different order.
It's not a problem for us if timeouts for values that were produced "at the same time" appear out of order, but we do need the original values to appear in the output flow in the same order as the input flow.
We can pull out the delay part:
(defn add-delay [ms f]
(m/ap
(let [v (m/?> ##Inf f)]
(m/? (m/sleep ms))
v)))
This one can also produce "at the same time" values out of order, but that's OK because we'll only be using it with the timeouts.
(We can write a version of add-delay
which always produces values in
the correct order, but it's not needed for this example).
We mix in the timeouts with the original values:
(defn add-timeouts [ms f]
(let [s (m/stream f)]
(m/ap
(m/amb=
(m/?> (m/eduction (map (fn [[i v]] [i :v v])) s))
(m/?> (add-delay ms (m/eduction (map (fn [[i v]] [i :timeout])) s)))))))
OK, now we have the stream of values and timeouts:
[0 :v a]
[1 :v b]
[0 :timeout]
[2 :v c]
...
With the events of interest supplied for us, the logic of the batch process is simple:
- As items come in, we add them to the bucket.
- If after adding an item the bucket is now full, emit the bucket as a batch.
- Keep track of the index of the oldest item in the bucket. E.g., if a, b, and c are in the bucket, that would be 0; if only c is in the bucket, that would be 2, and so on.
- If we receive a timeout with the index of an item still in the bucket, emit the bucket as a batch.
- Finally, when the input stream terminates and the bucket isn't empty, we emit the remaining items as a batch.
(defn batch [max-size max-delay f]
(transmute (add-timeouts max-delay (enumerate f))
(fn [get]
(m/ap
(loop [bucket [], oldest nil]
(if-let [[[i kind v]] (m/? get)]
(case kind
:v
(let [bucket2 (conj bucket v)]
(if (>= (count bucket2) max-size)
(m/amb bucket2 (recur [] nil))
(recur bucket2 (or oldest i))))
:timeout
(if (and oldest (>= i oldest))
(m/amb bucket (recur [] nil))
(recur bucket oldest)))
(if (= bucket [])
(m/amb)
bucket)))))))