Skip to content

Instantly share code, notes, and snippets.

@guidoschmidt17
Last active December 13, 2022 22:39
Show Gist options
  • Save guidoschmidt17/4fc855677e3ee0c5d9eba4fcd982c56f to your computer and use it in GitHub Desktop.
Save guidoschmidt17/4fc855677e3ee0c5d9eba4fcd982c56f to your computer and use it in GitHub Desktop.
package cqrs
package eventstore
package server
import cats.effect.std.Console
import fs2.io.net.SocketOption.*
import natchez.Trace.Implicits.noop
import skunk.Session
import zio.*
import zio.interop.catz.*
import EventStoreImpl.*
private object SessionPool:
val make =
inline given Console[([A] =>> Task[A])] = Console.make
Session
.pooled(
host = "localhost",
port = 5432,
user = "jimmy",
database = "db",
password = Some("banana"),
socketOptions = List(noDelay(true), receiveBufferSize(DefaultBufferSize), sendBufferSize(DefaultBufferSize)),
max = 64,
debug = false
)
.toManagedZIO
.map(_.toManagedZIO)
.scoped
val live = ZLayer.scoped(make)
inline final val DefaultBufferSize = 2 * 1024 * 1024
inline final private def readResultWithOffset[A, B](
check: Option[Query[A, Long]],
query: Query[A ~ Long ~ Long, B],
args: A,
message: String,
chunksize: Int,
limit: Long = Long.MaxValue
) =
val checkResult =
sessionPool.scoped
.flatMap(session => {
session
.prepare(check.get)
.toManagedZIO
.scoped
.flatMap(_.unique(args).retry(DebugRetrySchedule(s"countResultWithOffset $message $chunksize $limit")))
})
.flatMap(n => if n > 0 then ZIO.succeed(n) else ZIO.fail(NoResult(message)))
def loopResult(out: Queue[Take[Nothing, B]], offset: Ref[Long], n: Long) =
def resultStream(offset: Long, limit: Long) =
sessionPool.scoped
.flatMap(session => {
session
.prepare(query)
.toManagedZIO
.scoped
.flatMap { query =>
ZIO.succeed(
query
.stream(args ~ offset ~ limit, chunksize)
.toZStream(chunksize)
.tapErrorCause(ZIO.debug(_))
)
}
})
for
rowsread <- offset.get
result <- resultStream(rowsread, n - rowsread)
_ <- result.runForeach(b => offset.update(_ + 1) *> out.offer(Take.single(b)))
_ <- out.offer(Take.end)
yield ()
val result = for
maxcount <- if check.isDefined then checkResult.tapErrorCause(ZIO.debug(_)) else ZIO.succeed(Long.MaxValue)
n = math.min(maxcount, if limit < 1 then Long.MaxValue else limit)
offset: Ref[Long] <- Ref.make(0L)
out: Queue[Take[Nothing, B]] <- Queue.bounded(SmallSize)
outstream = ZStream
.fromQueue(out, out.capacity)
.flattenTake
_ <- loopResult(out, offset, n)
.retry(DebugRetrySchedule(s"readResultWithOffset $message $chunksize $offset $limit"))
.forkDaemon
yield outstream
result.catchAll(catchAllErrors(message))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment