Last active
January 22, 2019 14:35
-
-
Save avakhrenev/3325d924f3b47a6f887d95022cce06a2 to your computer and use it in GitHub Desktop.
fs2 runConcat. Evaluate two streams asynchronously, concating result.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{Effect, IO} | |
import fs2._ | |
import scala.concurrent.ExecutionContext | |
def runConcat[F[_], A](first: Stream[F, A], second: Stream[F, A])( | |
implicit F: Effect[F], | |
ec: ExecutionContext): Stream[F, A] = { | |
type Step = AsyncPull[F, Option[(Segment[A, Unit], Stream[F, A])]] | |
def readFull(s: Step): Pull[F, A, Unit] = | |
s.pull.flatMap { | |
case None => Pull.done | |
case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo | |
} | |
def go(first: Step, second: Step): Pull[F, A, Unit] = | |
first.race(second).pull.flatMap { | |
case Left(Some((hd, first))) => //reading first stream | |
Pull.output(hd) >> first.pull.unconsAsync.flatMap(go(_, second)) | |
case Left(None) => //first stream has ended, starting to read second | |
readFull(second) | |
case Right(Some((hd, second))) => //Second stream has emitted something | |
hd.uncons1 match { | |
case Left(_) => //that was empty segment, ignore and continue to pull from both streams | |
second.pull.unconsAsync.flatMap(go(first, _)) | |
case Right((hd, tl)) => //Ok, stop reading second stream until first is over | |
readFull(first) >> Pull.output(tl.cons(hd)) >> second.pull.echo | |
} | |
case Right(None) => readFull(first) | |
} | |
first.pull.unconsAsync | |
.flatMap(first => second.pull.unconsAsync.flatMap(second => go(first, second))) | |
.stream | |
} | |
//let's test | |
import ExecutionContext.Implicits._ | |
import scala.concurrent.duration._ | |
Scheduler | |
.apply[IO](3) | |
.flatMap { sch => | |
val a = sch.sleep_[IO](3.seconds) ++ Stream("a", "b", "c") | |
.segmentLimit(1) | |
.flatMap(Stream.segment) | |
.covary[IO] | |
.flatMap(s => sch.sleep_[IO](3.seconds) ++ Stream(s)) | |
val b = sch | |
.awakeEvery[IO](1.seconds) | |
.take(3) | |
.flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ | |
Stream("d", "e", "f").segmentLimit(1).flatMap(Stream.segment).covary[IO] ++ sch | |
.awakeEvery[IO](1.seconds) | |
.take(3) | |
.flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ Stream("d", "e", "f") | |
runConcat(a, b) | |
} | |
.evalMap(e => IO(println(e))) | |
.run | |
.unsafeRunSync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment