Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active August 29, 2015 14:07
Show Gist options
  • Save pchiusano/f3c250cb167d91982b1b to your computer and use it in GitHub Desktop.
Save pchiusano/f3c250cb167d91982b1b to your computer and use it in GitHub Desktop.
Creating diamonds in the scalaz-stream dataflow graph
object tee {
/**
* Convert a `Tee` to a `Process1` by feeding the same input to both sides of the `Tee`.
* Named after the 'diamond' shape this creates in the dataflow graph.
*/
def diamond[A,B](t: Tee[A,A,B]): Process1[A,B] = Process.suspend {
// todo: there's a more complex impl that performs better when branches
// have vastly different rates of processing
val (hd, tl) = t.unemit
if (hd.nonEmpty) Process.emitAll(hd) ++ diamond(tl)
else if (tl.isHalt) tl.asInstanceOf[Process1[A,B]]
else Process.receive1 { (a: A) => diamond(feedR(List(a))(feedL(List(a))(t))) }
}
/**
* Send the same input signal through both `p1` and `p2`, and combine results using `t`.
* Named after the 'diamond' shape this creates in the dataflow graph.
*/
def diamond[A,B,C,D](p1: Process1[A,B], p2: Process1[A,C])(t: Tee[B,C,D]): Process1[A,D] =
diamond { attachR(p2)(attachL(p1)(t)) }
/** Attach a transducer to the left input branch of `t`. */
def attachL[I0,I,I2,O](p: Process1[I0,I])(t: Tee[I,I2,O]): Tee[I0,I2,O] =
wye.attachL(p)(t).asInstanceOf[Tee[I0,I2,O]] // trust me
/** Attach a transducer to the right input branch of `t`. */
def attachR[I,I1,I2,O](p: Process1[I1,I2])(t: Tee[I,I2,O]): Tee[I,I1,O] =
wye.attachR(p)(t).asInstanceOf[Tee[I,I1,O]] // trust me
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment