Skip to content

Instantly share code, notes, and snippets.

@thobson
Created December 13, 2019 16:45
Show Gist options
  • Save thobson/5dc00dc03f7d8656422d62427ee0f5e7 to your computer and use it in GitHub Desktop.
Save thobson/5dc00dc03f7d8656422d62427ee0f5e7 to your computer and use it in GitHub Desktop.
Monix vs fs2
package com.example
import java.text.NumberFormat
import java.time.Instant
import cats.effect.{IO, Sync, Timer}
import fs2.{Chunk, Stream}
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
object StreamingPoc {
import cats.syntax.apply._
import cats.syntax.flatMap._
import cats.syntax.functor._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
private implicit val ioTimer: Timer[IO] = IO.timer(global)
private implicit val scheduler: Scheduler = monix.execution.Scheduler.Implicits.global
def main(args: Array[String]): Unit = {
println("fs2")
val fs2Op = time(fs2Count())
val fs2Ops = fs2Op *> IO.sleep(2.seconds) *> fs2Op *> fs2Op
fs2Ops.unsafeRunSync()
println("monix")
val monixOp = time(monixCount())
val monixOps = monixOp *> Task.sleep(2.seconds) *> monixOp *> monixOp
monixOps.runSyncUnsafe(1.minute)
}
def fs2Count(): IO[Unit] = {
for {
// iterator() gives an interator with 5,000,000 elements
stream <- IO.delay { Stream.fromIterator[IO](iterator()) }
count <- stream.compile.fold(0) { (acc, _) => acc + 1 }
_ <- printCount[IO](count)
} yield ()
}
def monixCount(): Task[Unit] = {
// iterator() gives an interator with 5,000,000 elements
val stream = Observable.fromIterator { Task(iterator()) }
val count = stream.foldLeftL(0)((acc, _) => acc + 1)
count.flatMap(c => printCount[Task](c))
}
def iterator(): Iterator[Int] = (1 to 5_000_000).iterator
def printCount[F[_]](count: Int)(implicit F: Sync[F]): F[Unit] = F.delay {
val formatted = NumberFormat.getIntegerInstance.format(count)
println(s"count: $formatted")
}
def time[F[_], A](fa: F[A])(implicit F: Sync[F]): F[A] = {
for {
start <- F.delay(Instant.now())
a <- fa
end <- F.delay(Instant.now())
duration <- F.delay(java.time.Duration.between(start, end))
_ <- F.delay(println(s"$duration \n"))
} yield a
}
}
@DLakomy
Copy link

DLakomy commented Nov 23, 2020

In case someone's wondering... The difference is quite shocking (6s fs2 vs 0.15 Monix). However, after some changes it's less so.

private val list = iterator().toList // added
[...]
stream <- IO.delay { Stream.emits(list).covary[IO] } // instead of line 39
[...]
val stream = Observable.fromIterable(list) // instead of line 47

Now the results are comparable, about 0.2s. Apparently fs2's Stream.fromIterator[IO](...) is inefficient.

PS It's not my idea. I've found the discussion on fs2's gitter channel and thought it's worth sharing in case someone runs this benchmark and is shocked, like I was yesterday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment