Last active
August 29, 2015 14:07
-
-
Save pchiusano/f3c250cb167d91982b1b to your computer and use it in GitHub Desktop.
Creating diamonds in the scalaz-stream dataflow graph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object tee { | |
/** | |
* Convert a `Tee` to a `Process1` by feeding the same input to both sides of the `Tee`. | |
* Named after the 'diamond' shape this creates in the dataflow graph. | |
*/ | |
def diamond[A,B](t: Tee[A,A,B]): Process1[A,B] = Process.suspend { | |
// todo: there's a more complex impl that performs better when branches | |
// have vastly different rates of processing | |
val (hd, tl) = t.unemit | |
if (hd.nonEmpty) Process.emitAll(hd) ++ diamond(tl) | |
else if (tl.isHalt) tl.asInstanceOf[Process1[A,B]] | |
else Process.receive1 { (a: A) => diamond(feedR(List(a))(feedL(List(a))(t))) } | |
} | |
/** | |
* Send the same input signal through both `p1` and `p2`, and combine results using `t`. | |
* Named after the 'diamond' shape this creates in the dataflow graph. | |
*/ | |
def diamond[A,B,C,D](p1: Process1[A,B], p2: Process1[A,C])(t: Tee[B,C,D]): Process1[A,D] = | |
diamond { attachR(p2)(attachL(p1)(t)) } | |
/** Attach a transducer to the left input branch of `t`. */ | |
def attachL[I0,I,I2,O](p: Process1[I0,I])(t: Tee[I,I2,O]): Tee[I0,I2,O] = | |
wye.attachL(p)(t).asInstanceOf[Tee[I0,I2,O]] // trust me | |
/** Attach a transducer to the right input branch of `t`. */ | |
def attachR[I,I1,I2,O](p: Process1[I1,I2])(t: Tee[I,I2,O]): Tee[I,I1,O] = | |
wye.attachR(p)(t).asInstanceOf[Tee[I,I1,O]] // trust me | |
... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment