Created December 13, 2019 16:45
Monix vs fs2
package com.example
import java.text.NumberFormat
import java.time.Instant
import cats.effect.{IO, Sync, Timer}
import fs2.{Chunk, Stream}
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
object StreamingPoc {
import cats.syntax.apply._
import cats.syntax.flatMap._
import cats.syntax.functor._
import scala.concurrent.duration._
private implicit val ioTimer: Timer[IO] = IO.timer(global)
private implicit val scheduler: Scheduler =
def main(args: Array[String]): Unit = {
val fs2Op = time(fs2Count())
val fs2Ops = fs2Op *> IO.sleep(2.seconds) *> fs2Op *> fs2Op
val monixOp = time(monixCount())
val monixOps = monixOp *> Task.sleep(2.seconds) *> monixOp *> monixOp
def fs2Count(): IO[Unit] = {
for {
// iterator() gives an interator with 5,000,000 elements
stream <- IO.delay { Stream.fromIterator[IO](iterator()) }
count <- stream.compile.fold(0) { (acc, _) => acc + 1 }
_ <- printCount[IO](count)
} yield ()
def monixCount(): Task[Unit] = {
// iterator() gives an interator with 5,000,000 elements
val stream = Observable.fromIterator { Task(iterator()) }
val count = stream.foldLeftL(0)((acc, _) => acc + 1)
count.flatMap(c => printCount[Task](c))
def iterator(): Iterator[Int] = (1 to 5_000_000).iterator
def printCount[F[_]](count: Int)(implicit F: Sync[F]): F[Unit] = F.delay {
val formatted = NumberFormat.getIntegerInstance.format(count)
println(s"count: $formatted")
def time[F[_], A](fa: F[A])(implicit F: Sync[F]): F[A] = {
for {
start <- F.delay(
a <- fa
end <- F.delay(
duration <- F.delay(java.time.Duration.between(start, end))
_ <- F.delay(println(s"$duration \n"))
} yield a
DLakomy commented Nov 23, 2020

In case someone's wondering... The difference is quite shocking (6s fs2 vs 0.15 Monix). However, after some changes it's less so.

private val list = iterator().toList // added
stream <- IO.delay { Stream.emits(list).covary[IO] } // instead of line 39
val stream = Observable.fromIterable(list) // instead of line 47

Now the results are comparable, about 0.2s. Apparently fs2's Stream.fromIterator[IO](...) is inefficient.

PS It's not my idea. I've found the discussion on fs2's gitter channel and thought it's worth sharing in case someone runs this benchmark and is shocked, like I was yesterday.

