Skip to content

Instantly share code, notes, and snippets.

@guidoschmidt17
Last active December 15, 2022 15:55
Show Gist options
  • Save guidoschmidt17/73d6ef5facf85e7cf38fefa41e4371e8 to your computer and use it in GitHub Desktop.
Save guidoschmidt17/73d6ef5facf85e7cf38fefa41e4371e8 to your computer and use it in GitHub Desktop.
def readResultWithOffset[A, B](
check: Option[Query[A, Long]],
query: Query[A ~ Long ~ Long, B],
args: A,
message: String,
chunksize: Int,
limit: Long = Long.MaxValue
) =
val checkResult =
sessionPool
.use(session => {
session
.prepare(check.get)
.toScopedZIO
.flatMap(_.unique(args).retry(DefaultRetrySchedule))
})
.flatMap(n => if n > 0 then ZIO.succeed(n) else ZIO.fail(NoResult(message)))
def loopResult(out: Queue[Take[StreamingError, B]], offset: Ref[Long], n: Long) =
def resultStream(offset: Long, limit: Long) =
sessionPool
.use(session => {
session
.prepare(query)
.toScopedZIO
.flatMap(query =>
ZIO
.succeed(
query
.stream(args ~ offset ~ limit, chunksize)
.toZStream(chunksize)
)
)
})
for
rowsread <- offset.get
result <- resultStream(rowsread, n - rowsread)
_ <- result
.runForeachChunk(c => offset.update(_ + c.size) *> out.offer(Take.chunk(c)))
.ensuring(out.offer(Take.end))
yield ()
val result =
for
maxcount <- if check.isDefined then checkResult else ZIO.succeed(Long.MaxValue)
n = math.min(maxcount, if limit < 1 then Long.MaxValue else limit)
offset: Ref[Long] <- Ref.make(0L)
out: Queue[Take[StreamingError, B]] <- Queue.bounded(chunksize)
outstream = ZStream
.fromQueueWithShutdown(out, out.capacity)
.flattenTake
_ <- loopResult(out, offset, n)
.retry(DefaultRetrySchedule)
.forkScoped
yield outstream
result.catchAll(catchAllErrors(message))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment