Skip to content

Instantly share code, notes, and snippets.

@awwx
Last active August 5, 2024 08:27
Show Gist options
  • Save awwx/18252e376d2480b893fb95ea2c98f56b to your computer and use it in GitHub Desktop.
Save awwx/18252e376d2480b893fb95ea2c98f56b to your computer and use it in GitHub Desktop.
transmute

Here's a mechanism for transforming a discrete input flow into an output flow where the internal process state can be implemented with loop/recur. As an example I provide an implementation of "batching with both a maximum size and a maximum delay" referenced in leonoel/missionary#109.

Transforming discrete flows

A common pattern for discrete flows is that we want to create an output flow from an input flow, where:

  • We process items from the input flow one at a time.
  • In response to an input item, we may want to produce zero, one, or multiple output items.
  • Often, we have some internal state to decide how to process the items; and so, in addition to producing zero or more output items, we may want to also update our internal state.

We have multiple options for an implementation.

Transducers

We could write a transducer and then call m/eduction. For example, if we wanted to remove consecutive duplicates, we could write https://github.com/clojure/clojure/blob/clojure-1.11.1/src/clj/clojure/core.clj#L7728, and then call (m/eduction (dedupe) input-flow).

m/ap with imperative state

Using the same pattern of imperatively updating the internal state, we could also write the transformer directly using m/ap:

(defn mdedupe [f]
  (let [!prior (volatile! ::none)]
    (m/ap
     (let [v (m/?> f)
           prior @!prior]
       (vreset! !prior v)
       (if (= prior v)
         (m/amb)
         v)))))

loop/recur

When I have a process with internal state, I like to implement the state using loop/recur. I find that, for me, there's less cognative overhead in keeping track of what's happening when. For something as simple as "dedupe" it may not make a substantial difference, but for a more complex process it can help avoid exceeding my mental capacity in following the logic of the implementation.

If I had an rdv I could use to pull values from the input flow, I could use loop/recur for the state:

(defn mdedupe [f]
  (transmute f
    (fn [get]
      (m/ap
       (loop [prior ::none]
         (if-let [[v] (m/? get)]
           (m/amb
            (if (not= v prior)
              v
              (m/amb))
            (recur v))
           (m/amb)))))))

I pass nil to the rdv to signal termination of the input flow, and [v] to deliver a value from the input flow. This serves as a "maybe" value, where [nil] means the input flow has produced a nil value, and nil means the input flow has terminated.

if-let is a convenient way of checking the "maybe" value received from the rdv. An if-let with a destructuring binding first checks whether the test value is truthy, and, if it is, it applies the destructuring to the test value and proceeds to the "true" branch. But, if the test value is falsey, the destructuring is skipped and the "false" branch is invoked.

With (if-let [[v] (m/? get)] ... ...), a [nil] value invokes the "true" branch with v set to nil, while a nil value invokes the "false" branch.

"if or none" macros

In Clojure, the false branch of an if defaults to nil if it hasn't been specified. In an m/ap, the zero or identity operation could be considered (m/amb), that is, to produce no values. It's convenient to have an "if or none" macro:

(defmacro ifn [test then]
  `(if ~test ~then (m/amb)))

Note that I deliberately leave out the option to include an "else" clause, because if we have an "else" clause we can simply use an if.

Similarly I can have an "if-let or none" macro:

(defmacro if-letn [bindings then]
  `(if-let ~bindings ~then (m/amb)))

Now I can write my mdedupe as:

(defn mdedupe [f]
  (transmute f
    (fn [get]
      (m/ap
       (loop [prior ::none]
         (if-letn [[v] (m/? get)]
           (m/amb
            (ifn (not= v prior) v)
            (recur v))))))))

mtake

As another example, an implementation of mtake, equivalent to (m/eduction (take n) f):

(defn mtake [n f]
  (transmute f
    (fn [get]
      (m/ap
       (loop [n n]
         (ifn (pos? n)
           (if-letn [[v] (m/? get)]
             (m/amb v (recur (dec n))))))))))

transmute

What about the implementation of transmute? We have a couple aspects to consider. We need to consume values from the input flow and provide them to the transmuter over the rdv. We also want to cancel the input flow when the output flow terminates, which doesn't happen automatically across an rdv.

A convenient way to cancel a process is to use m/race. If cancel is an m/dfv, we can use that to trigger the m/race to cancel the process: (m/race cancel (task-to-consume-input-flow))

As a helper, let's write a task to consume the input flow.

(item v) is run as a task for each value v produced by the input flow. terminated is a run as a task when the input flow terminates.

(defn on-each [f item terminated]
  (m/reduce (fn [_ _]) nil
    (m/ap
     (m/amb
      (let [v (m/?> f)]
        (m/? (item v))
        (m/amb))
      (do
        (m/? terminated)
        (m/amb))))))

Now the implementation of "transmute" is perhaps a bit clunky, but straightforward:

(defn transmute [flow transmuter]
  (let [in (m/rdv)
        cancel (m/dfv)]
    (m/ap
     (m/amb=
      (do
        (m/?
         (m/race
          cancel
          (on-each flow #(in [%]) (in nil))))
        (m/amb))
      (m/amb
       (m/?> (transmuter in))
       (do (cancel true) (m/amb)))))))

This has two parts, the two branches of the m/amb=:

The first part delivers values from the input flow to the transmuter, and cancels the input flow if cancel is assigned. It's part of the m/ap through the m/amb= so that cancellation of the output flow propagates to the task, but it doesn't itself produce any values to the output flow.

The second part receives and produces values from the transmuter, and assigns cancel when the transmuter terminates.

batch

As another example, we can consider an alternative implementation for leonoel/missionary#109

The goal is to create a "batch" process, where there's both a maximum number of items to include in a batch and also a maximum latency where a smaller batch is produced if necessary to avoid holding on to any item for too long.

This is a discrete process, where every input value appears in the output flow in one batch or another.

Following the "maybe a bit clunky, but straightforward" approach, we have two kinds of events here: a value is produced from the input flow, and a value has reached its timeout where it needs to be produced in a batch if it hasn't already.

If we had a discrete flow of these events, the batch process itself is easy: we can process the events one at a time. As values come in we add them to a bucket. We emit a batch when:

  • The bucket is now full after adding a value;
  • We receive a timeout for a value which is in the bucket; or,
  • The input flow terminates and the bucket isn't empty.

enumerate

For convenience, we can enumerate the input values so we can easily tell which value a timeout is for.

a b c d... ⇒ [0 a] [1 b] [2 c] [3 d] ...

(defn enumerate [f]
  (m/eduction (map-indexed vector) f))

add-timeouts

Now we want to mix in timeouts:

[0 a] [1 b] [2 c] ... ⇒ [0 :v a] [1 :v b] [0 :timeout] [2 :v c] ...

Here's a buggy implementation:

(defn add-timeouts [ms f]
  (m/ap
   (let [[i v] (m/?> ##Inf f)]
     (m/amb
      [i :v v]
      (do (m/? (m/sleep ms))
          [i :timeout])))))

The problem is that if the input flow produced values synchronously, such as by m/seed (i.e. the values were produced "at the same time", without a delay between them), the output values can appear in a different order.

It's not a problem for us if timeouts for values that were produced "at the same time" appear out of order, but we do need the original values to appear in the output flow in the same order as the input flow.

We can pull out the delay part:

(defn add-delay [ms f]
  (m/ap
   (let [v (m/?> ##Inf f)]
     (m/? (m/sleep ms))
     v)))

This one can also produce "at the same time" values out of order, but that's OK because we'll only be using it with the timeouts.

(We can write a version of add-delay which always produces values in the correct order, but it's not needed for this example).

We mix in the timeouts with the original values:

(defn add-timeouts [ms f]
  (let [s (m/stream f)]
    (m/ap
     (m/amb=
      (m/?> (m/eduction (map (fn [[i v]] [i :v v])) s))
      (m/?> (add-delay ms (m/eduction (map (fn [[i v]] [i :timeout])) s)))))))

batch implementation

OK, now we have the stream of values and timeouts:

[0 :v a] [1 :v b] [0 :timeout] [2 :v c] ...

With the events of interest supplied for us, the logic of the batch process is simple:

  • As items come in, we add them to the bucket.
  • If after adding an item the bucket is now full, emit the bucket as a batch.
  • Keep track of the index of the oldest item in the bucket. E.g., if a, b, and c are in the bucket, that would be 0; if only c is in the bucket, that would be 2, and so on.
  • If we receive a timeout with the index of an item still in the bucket, emit the bucket as a batch.
  • Finally, when the input stream terminates and the bucket isn't empty, we emit the remaining items as a batch.
(defn batch [max-size max-delay f]
  (transmute (add-timeouts max-delay (enumerate f))
    (fn [get]
      (m/ap
       (loop [bucket [], oldest nil]
        (if-let [[[i kind v]] (m/? get)]
          (case kind
            :v
            (let [bucket2 (conj bucket v)]
              (if (>= (count bucket2) max-size)
                (m/amb bucket2 (recur [] nil))
                (recur bucket2 (or oldest i))))

            :timeout
            (if (and oldest (>= i oldest))
              (m/amb bucket (recur [] nil))
              (recur bucket oldest)))
          (if (= bucket [])
            (m/amb)
            bucket)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment