Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Last active July 21, 2023 13:17
Show Gist options
  • Save kamilkloch/9a083701ee4ee9e2379d9585c8e4f0ed to your computer and use it in GitHub Desktop.
Save kamilkloch/9a083701ee4ee9e2379d9585c8e4f0ed to your computer and use it in GitHub Desktop.
import cats.effect.std.Supervisor
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import org.HdrHistogram.{ConcurrentHistogram, Histogram}
import scala.concurrent.duration.DurationInt
import scala.util.chaining.scalaUtilChainingOps
object RefProducer extends IOApp.Simple {
val n = 20_000
type Watermark = Long
case class Payload(ts: Long)
trait Sig {
def write: IO[Unit]
def read(prevWatermark: Option[Watermark]): IO[Option[(Watermark, Payload)]]
}
object Sig {
def create: IO[Sig] = {
IO.realTime.flatMap { ts =>
IO.ref(0L -> Payload(ts.toMillis)).map { state =>
new Sig {
def write: IO[Unit] = {
state.update { case (watermark, _) =>
(watermark + 1, Payload(System.currentTimeMillis()))
}
}
def read(prevWatermark: Option[Watermark]): IO[Option[(Watermark, Payload)]] = {
state.get.map { case (watermark, payload) =>
prevWatermark match {
case Some(prevWatermark) =>
if (watermark == prevWatermark)
None
else if (watermark == prevWatermark + 1)
Some((watermark, payload))
else throw new Exception(s"prevWatermark=$prevWatermark, watermark=$watermark")
case None => Some((watermark, payload))
}
}
}
}
}
}
}
}
/** Feeds the provided Sig with a timestamp every 500ms */
def tsService(sig: Sig): IO[Nothing] =
sig
.write
.timed
.flatMap(_._1.toMillis.pipe(t => IO.println(s"Producer published in ${t}ms")))
.delayBy(500.millis).foreverM.onCancel(IO.println("Canceled producer"))
def responseStream(n: Int, sig: Sig, prev: Option[(Watermark, Payload)], hist: Histogram): IO[Unit] = {
if (n <= 0) IO.unit else
sig.read(prev.map(_._1)).flatMap {
case x@Some((_, Payload(tsProducer))) =>
if (prev.isDefined) IO.realTime.flatMap { tsConsumer =>
hist.recordValue(math.abs(tsConsumer.toMillis - tsProducer))
responseStream(n - 1, sig, x, hist).delayBy(5.millis)
}
else responseStream(n - 1, sig, x, hist).delayBy(5.millis)
case None => responseStream(n - 1, sig, prev, hist).delayBy(5.millis)
}
}
private val hist = new ConcurrentHistogram(1L, 10_000L, 3)
def run: IO[Unit] = {
val doWork = Supervisor[IO].use { sup =>
Sig.create.flatMap { sig =>
tsService(sig).supervise(sup) >>
List.fill(n)(responseStream(1000, sig, None, hist)).parSequence_
}
}
IO.println("Warmup") >> doWork >> IO(hist.reset()) >> IO.sleep(2.second) >>
IO.println("Benchmark") >> doWork >> IO(hist.outputPercentileDistribution(System.out, 1.0))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment