Skip to content

Instantly share code, notes, and snippets.

@Ichoran
Created August 8, 2024 16:52
Show Gist options
  • Save Ichoran/d971d086823964b0353eac307b69f006 to your computer and use it in GitHub Desktop.
Save Ichoran/d971d086823964b0353eac307b69f006 to your computer and use it in GitHub Desktop.
Benchmarking Cats Effect 3 mutex vs. Java Loom with java.util.concurrent.Semaphore
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Rex Kerr (all rights for this test code ceded to Typelevel).
*/
//> using scala 3.5.0-RC6
//> using dep com.github.ichoran::kse3-flow:0.3.7
//> using dep com.github.ichoran::kse3-maths:0.3.7
//> using dep org.typelevel::cats-core:2.12.0
//> using dep org.typelevel::cats-effect:3.5.4
// Run with scala-cli --power --jmh --jvm=21 MutexBench.scala
// If you change the classes that have to be benchmarked, you may have to rm -r .scala_build
package cats.effect.benchmarks
import cats.effect.IO
import cats.effect.std._
import cats.effect.unsafe.implicits.global
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import java.util.concurrent.TimeUnit
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 10, time = 2000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 4000, timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(10000)
@Fork(2)
@Threads(1)
class MutexBenchmark:
import cats.syntax.all.*
import kse.basics.*
import kse.flow.*
@Param(Array("1", "5", "20", "100"))
var fibers: Int = -1
//@Param(Array("10000"))
//var acquires: Int = -1
val acquires = 10000
private def mutexCompleteImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] =
if fibers == 1 then
mutex.flatMap: m =>
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires)
else
mutex.flatMap: m =>
m.lock.use{ _ => IO(n.++) }.parReplicateA_(fibers)
.replicateA_(acquires/fibers)
@Benchmark
def mutexComplete(): Unit =
val n = Atom.Count()
mutexCompleteImpl(mutex = Mutex.apply, n = n).unsafeRunSync()
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
private def mutexTransposeImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] =
if fibers == 1 then
mutex.flatMap: m =>
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires)
else
mutex.flatMap: m =>
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires/fibers)
.parReplicateA_(fibers)
@Benchmark
def mutexTranspose(): Unit =
val n = Atom.Count()
mutexTransposeImpl(mutex = Mutex.apply, n = n).unsafeRunSync()
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
private def mutexKilledImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] =
if fibers == 1 then
mutex
.flatMap: m =>
m.lock.surround:
m.lock.use_.start.flatMap{ fiber => n.++; IO.cede >> fiber.cancel }
.replicateA_(acquires)
else
mutex
.flatMap: m =>
m.lock.surround:
m.lock.use_.start.flatMap{ fiber => n.++; IO.cede >> fiber.cancel }.parReplicateA_(fibers)
.replicateA_(acquires/fibers)
@Benchmark
def mutexKilled(): Unit =
val n = Atom.Count()
mutexKilledImpl(mutex = Mutex.apply, n = n).unsafeRunSync()
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
private def atomicCellCompleteImpl(cell: IO[AtomicCell[IO, Int]]): IO[Unit] =
if fibers == 1 then
cell.flatMap: c =>
c.evalUpdate(i => IO(i + 1)).replicateA_(acquires)
else
cell.flatMap:
c => c.evalUpdate(i => IO(i + 1)).parReplicateA_(fibers)
.replicateA_(acquires/fibers)
@Benchmark
def atomicCellComplete(): Unit =
atomicCellCompleteImpl(cell = AtomicCell.concurrent(0)).unsafeRunSync()
private def atomicCellTransposeImpl(cell: IO[AtomicCell[IO, Int]]): IO[Unit] =
if fibers == 1 then
cell.flatMap: c =>
c.evalUpdate(i => IO(i + 1)).replicateA_(acquires)
else
cell.flatMap:
c => c.evalUpdate(i => IO(i + 1)).replicateA_(acquires/fibers)
.parReplicateA_(fibers)
@Benchmark
def atomicCellTranspose(): Unit =
atomicCellCompleteImpl(cell = AtomicCell.concurrent(0)).unsafeRunSync()
extension (s: java.util.concurrent.Semaphore)
inline def held[A](inline f: => A): A =
try
s.acquire()
f
finally
s.release()
@Benchmark
def kseSemaphoreComplete(bh: Blackhole): Unit =
val m = new java.util.concurrent.Semaphore(1, true)
val n = Atom.Count()
(acquires/fibers).times:
val fs = fibers.arrayed: i =>
Fu:
m.held:
n.++
fs.map(_.ask())
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
@Benchmark
def kseSemaphoreTranspose(bh: Blackhole): Unit =
val m = new java.util.concurrent.Semaphore(1, true)
val n = Atom.Count()
val fs = fibers.arrayed: i =>
Fu:
(acquires/fibers).times:
m.held:
n.++
fs.map(_.ask())
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
@Benchmark
def kseSemaphoreKilled(bh: Blackhole): Unit =
val m = new java.util.concurrent.Semaphore(1, true)
val n = Atom.Count()
(acquires/fibers).times:
val fs = fibers.arrayed: i =>
Fu:
m.held:
n.++
Thread.currentThread.interrupt()
fs.map(_.ask())
if n() != acquires then
throw new Exception(s"Got ${n()} updates and expected $acquires")
GOAL: create `fibers` tasks and run in parallel to get mutex, add 1, quit.
Repeat until 10,000 additions have been performed
VM OpenJDK.21 GraalCE.21
Benchmark (fibers) Score Error Score Error Units Win
mutexComplete 1 3.414 ± 0.241 4.055 ± 0.082 ops/us <<
atomicCellComplete 1 3.049 ± 0.074 3.598 ± 0.159 ops/us
kseSemaphoreComplete 1 0.432 ± 0.017 0.461 ± 0.014 ops/us
mutexComplete 5 0.108 ± 0.016 0.147 ± 0.021 ops/us
atomicCellComplete 5 0.090 ± 0.012 0.133 ± 0.023 ops/us
kseSemaphoreComplete 5 1.301 ± 0.033 1.507 ± 0.018 ops/us <<
mutexComplete 20 0.180 ± 0.068 0.142 ± 0.018 ops/us
atomicCellComplete 20 0.096 ± 0.016 0.123 ± 0.011 ops/us
kseSemaphoreComplete 20 1.986 ± 0.021 2.240 ± 0.015 ops/us <<
mutexComplete 100 0.113 ± 0.013 0.144 ± 0.010 ops/us
atomicCellComplete 100 0.096 ± 0.010 0.129 ± 0.014 ops/us
kseSemaphoreComplete 100 1.380 ± 0.023 1.288 ± 0.109 ops/us <<
GOAL: create `fibers` tasks that run in parallel to get mutex, add 1, repeat.
The parallel operations each quit after 10,000/fibers repeats
VM OpenJDK.21 GraalCE.21
Benchmark (fibers) Score Error Score Error Units Win
mutexTranspose 1 3.641 ± 0.118 4.029 ± 0.043 ops/us
atomicCellTranspose 1 2.790 ± 0.175 3.759 ± 0.086 ops/us
kseSemaphoreTranspose 1 73.059 ± 1.532 76.123 ± 0.456 ops/us <<
mutexTranspose 5 5.061 ± 0.282 5.126 ± 0.233 ops/us <<
atomicCellTranspose 5 0.124 ± 0.045 0.122 ± 0.014 ops/us
kseSemaphoreTranspose 5 2.023 ± 0.027 2.501 ± 0.036 ops/us
mutexTranspose 20 4.801 ± 0.485 5.305 ± 0.966 ops/us <<
atomicCellTranspose 20 0.204 ± 0.053 0.124 ± 0.015 ops/us
kseSemaphoreTranspose 20 1.735 ± 0.080 2.162 ± 0.083 ops/us
mutexTranspose 100 8.294 ± 0.647 7.011 ± 0.376 ops/us <<
atomicCellTranspose 100 0.169 ± 0.050 0.134 ± 0.017 ops/us
kseSemaphoreTranspose 100 0.998 ± 0.069 0.440 ± 0.029 ops/us
GOAL: create `fibers` tasks that run in parallel to get mutex, add 1,
then cancel self! Repeat until 10,000 additions have been performed.
VM OpenJDK.21 GraalCE.21
Benchmark (fibers) Score Error Score Error Units Win
mutexKilled 1 0.114 ± 0.024 0.114 ± 0.012 ops/us
kseSemaphoreKilled 1 0.433 ± 0.011 0.457 ± 0.009 ops/us <<
mutexKilled 5 0.108 ± 0.039 0.077 ± 0.019 ops/us
kseSemaphoreKilled 5 1.179 ± 0.049 1.373 ± 0.018 ops/us <<
mutexKilled 20 0.138 ± 0.059 0.112 ± 0.014 ops/us
kseSemaphoreKilled 20 1.211 ± 0.025 1.312 ± 0.011 ops/us <<
mutexKilled 100 0.497 ± 0.271 0.191 ± 0.044 ops/us
kseSemaphoreKilled 100 0.871 ± 0.009 0.847 ± 0.124 ops/us <<
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment