Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Created November 30, 2023 18:01
Show Gist options
  • Save kamilkloch/aee71dc16855058ce79e3d7ee699b199 to your computer and use it in GitHub Desktop.
Save kamilkloch/aee71dc16855058ce79e3d7ee699b199 to your computer and use it in GitHub Desktop.
fs2 Stream merge performance vs raw IOs
import cats.effect.std.Queue
import cats.effect.{IO, IOApp}
import fs2._
object StreamMergePerformanceTests extends IOApp.Simple {
def run: IO[Unit] = {
case class Payload(x: Integer)
Queue.bounded[IO, Payload](2).flatMap { q =>
val payload = Payload(123)
val n = 5000
val m = 50
val consumer = Stream.fromQueueUnterminated(q).drain
//val consumer = Stream.repeatEval(q.take)
val producer = Stream.exec(q.offer(payload)).repeatN(n)
//val producer = Stream.repeatEval(q.offer(payload)).take(n)
val testStreams = producer.concurrently(consumer).compile.drain
val testStreamsCompiled = consumer.compile.drain.background.use(_ => producer.compile.drain)
val testQueues = q.take.foreverM.background.use(_ => q.offer(payload).replicateA_(n))
IO.print("Concurrent drained streams: ") >>
testStreams.replicateA_(m) >>
testStreams.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println) >>
IO.print("Drained streams, compiled, concurrently as IOs: ") >>
testStreamsCompiled.replicateA_(m) >>
testStreamsCompiled.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println) >>
IO.print("Concurrent Queue#offer and Queue#take: ") >>
testQueues.replicateA_(m) >>
testQueues.timed.map(_._1.toMillis).replicateA(m).map(x => x.sum.toDouble / x.size).flatMap(IO.println)
}
}
}
@kamilkloch
Copy link
Author

kamilkloch commented Nov 30, 2023

Concurrent drained streams: 64.54
Drained streams, compiled, concurrently as IOs: 6.24
Concurrent Queue#offer and Queue#take: 1.42

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment