Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Created July 20, 2023 15:43
Show Gist options
  • Save kamilkloch/09128c6be9dd1ada3f02f9b9061c6d66 to your computer and use it in GitHub Desktop.
Save kamilkloch/09128c6be9dd1ada3f02f9b9061c6d66 to your computer and use it in GitHub Desktop.
import zio.Clock.ClockLive
import zio.Console.ConsoleLive
import zio._
import zio.stream.ZStream
import java.io.IOException
import java.util.concurrent.TimeUnit
object ZioHub extends ZIOAppDefault {
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
/** Feeds the provided hub with a timestamp every 500ms */
def tsService(hub: Hub[Long]): ZIO[Any, IOException, Nothing] = {
(for {
ts <- ClockLive.currentTime(TimeUnit.MILLISECONDS)
published <- hub.publish(ts)
_ <- if (published) ZIO.unit else ConsoleLive.printLine(s"Unsuccessful publish [ts=$ts]")
ts2 <- ClockLive.currentTime(TimeUnit.MILLISECONDS)
d = math.abs(ts2 - ts)
_ <- if (d > 1) ConsoleLive.printLine(s"Delay: ${math.abs(ts2 - ts)}ms") else ZIO.unit
} yield ())
.delay(500.millis).forever
}
val n = 10_000
val hubCapacity = 1024
ZIO.scoped {
for {
hub <- Hub.bounded[Long](hubCapacity)
scope <- ZIO.scope
_ <- tsService(hub).forkIn(scope)
_ <- ZIO.foreachParDiscard(List.fill(n)(0))(_ => ZStream.fromHub(hub).runDrain)
} yield ()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment