Created
December 13, 2019 16:45
-
-
Save thobson/5dc00dc03f7d8656422d62427ee0f5e7 to your computer and use it in GitHub Desktop.
Monix vs fs2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import java.text.NumberFormat | |
import java.time.Instant | |
import cats.effect.{IO, Sync, Timer} | |
import fs2.{Chunk, Stream} | |
import monix.eval.Task | |
import monix.execution.Scheduler | |
import monix.reactive.Observable | |
object StreamingPoc { | |
import cats.syntax.apply._ | |
import cats.syntax.flatMap._ | |
import cats.syntax.functor._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
private implicit val ioTimer: Timer[IO] = IO.timer(global) | |
private implicit val scheduler: Scheduler = monix.execution.Scheduler.Implicits.global | |
def main(args: Array[String]): Unit = { | |
println("fs2") | |
val fs2Op = time(fs2Count()) | |
val fs2Ops = fs2Op *> IO.sleep(2.seconds) *> fs2Op *> fs2Op | |
fs2Ops.unsafeRunSync() | |
println("monix") | |
val monixOp = time(monixCount()) | |
val monixOps = monixOp *> Task.sleep(2.seconds) *> monixOp *> monixOp | |
monixOps.runSyncUnsafe(1.minute) | |
} | |
def fs2Count(): IO[Unit] = { | |
for { | |
// iterator() gives an interator with 5,000,000 elements | |
stream <- IO.delay { Stream.fromIterator[IO](iterator()) } | |
count <- stream.compile.fold(0) { (acc, _) => acc + 1 } | |
_ <- printCount[IO](count) | |
} yield () | |
} | |
def monixCount(): Task[Unit] = { | |
// iterator() gives an interator with 5,000,000 elements | |
val stream = Observable.fromIterator { Task(iterator()) } | |
val count = stream.foldLeftL(0)((acc, _) => acc + 1) | |
count.flatMap(c => printCount[Task](c)) | |
} | |
def iterator(): Iterator[Int] = (1 to 5_000_000).iterator | |
def printCount[F[_]](count: Int)(implicit F: Sync[F]): F[Unit] = F.delay { | |
val formatted = NumberFormat.getIntegerInstance.format(count) | |
println(s"count: $formatted") | |
} | |
def time[F[_], A](fa: F[A])(implicit F: Sync[F]): F[A] = { | |
for { | |
start <- F.delay(Instant.now()) | |
a <- fa | |
end <- F.delay(Instant.now()) | |
duration <- F.delay(java.time.Duration.between(start, end)) | |
_ <- F.delay(println(s"$duration \n")) | |
} yield a | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In case someone's wondering... The difference is quite shocking (6s fs2 vs 0.15 Monix). However, after some changes it's less so.
Now the results are comparable, about 0.2s. Apparently fs2's
Stream.fromIterator[IO](...)
is inefficient.PS It's not my idea. I've found the discussion on fs2's gitter channel and thought it's worth sharing in case someone runs this benchmark and is shocked, like I was yesterday.