Created
May 4, 2023 13:34
-
-
Save samspills/b35a4aed396af0e8eb2fd681e4ea409f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//> using lib "co.fs2::fs2-core:3.5.0" | |
//> using lib "org.typelevel::cats-effect:3.4.6" | |
import cats.effect.{IO, IOApp} | |
import fs2.Stream | |
import scala.concurrent.duration._ | |
object StreamingChunksIO extends IOApp.Simple { | |
def mapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.map(i => i + 0.1) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def mapAsyncCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.mapAsync(2) { i => | |
IO.println(s"inside evalMap, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
} | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def evalMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMap(i => | |
IO.println(s"inside evalMap, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def evalMapChunkCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def twoEvalMapChunkCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside first evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside second evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
// same as mapAsyncCheck | |
def parEvalMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) // Stream[IO, Chunk[Int]] | |
.unchunks // Stream[IO, Int] | |
.debugChunks(c => s"Chunk before anything: $c") | |
.parEvalMap(2) { i => | |
IO.println(s"inside evalMap, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
} | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def chunkedParEvalMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.debugChunks(c => s"Chunk before anything: $c") | |
.parEvalMap(2) { c => | |
IO.println(s"inside parEvalMap, element before sleep: $c") *> | |
IO.sleep(1.second) *> | |
IO.pure(c.map(_ + 0.1)) | |
} | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def parEvalMapUnorderedCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.parEvalMapUnordered(2) { i => | |
IO.println(s"inside evalMap, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
} | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def prefetchNBeforeMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.prefetchN(2) | |
.evalMapChunk(i => | |
IO.println(s"inside evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def prefetchNAfterMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(2) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def prefetchNBetweenMapsCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside first evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(2) | |
.evalMapChunk(i => | |
IO.println(s"inside second evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def chunkedPrefetchNBetweenMapsCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMap(c => | |
IO.println(s"inside first evalMapChunk, element before sleep: $c") *> | |
IO.sleep(1.second) *> | |
IO.pure(c.map(_ + 0.1)) | |
) | |
.prefetchN(2) | |
.evalMap(c => | |
IO.println(s"inside second evalMapChunk, element before sleep: $c") *> | |
IO.sleep(1.second) *> | |
IO.pure(c.map(_ + 0.1)) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def mapPrefetchNMapMapCheck = Stream | |
.emits[IO, Int](0 until 100) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside first evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(1) | |
.evalMapChunk(i => | |
IO.println(s"inside second evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside third evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def mapPrefetchNTenMapsCheck = Stream | |
.emits[IO, Int]((0 until 100).toList) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside first evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(10) | |
.evalMapChunk(i => | |
IO.println(s"inside second evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside third evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside fourth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside fifth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside sixth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside seventh evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside eighth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside ninth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.evalMapChunk(i => | |
IO.println(s"inside tenth evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def mapThenPrefetchNMapTwiceCheck = Stream | |
.emits[IO, Int]((0 until 100).toList) | |
.chunkN(10) | |
.unchunks | |
.debugChunks(c => s"Chunk before anything: $c") | |
.evalMapChunk(i => | |
IO.println(s"inside first evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(1) | |
.evalMapChunk(i => | |
IO.println(s"inside second evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.prefetchN(1) | |
.evalMapChunk(i => | |
IO.println(s"inside third evalMapChunk, element before sleep: $i") *> | |
IO.sleep(1.second) *> | |
IO.pure(i + 0.1) | |
) | |
.debugChunks(c => s"Chunk after everything $c") | |
.compile | |
.drain | |
def meteredMaybe = Stream.emits[IO, Int]((0 until 100).toList) | |
def run = mapPrefetchNMapMapCheck | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment