Skip to content

Instantly share code, notes, and snippets.

@samspills
Created May 4, 2023 13:34
Show Gist options
  • Save samspills/b35a4aed396af0e8eb2fd681e4ea409f to your computer and use it in GitHub Desktop.
Save samspills/b35a4aed396af0e8eb2fd681e4ea409f to your computer and use it in GitHub Desktop.
//> using lib "co.fs2::fs2-core:3.5.0"
//> using lib "org.typelevel::cats-effect:3.4.6"
import cats.effect.{IO, IOApp}
import fs2.Stream
import scala.concurrent.duration._
object StreamingChunksIO extends IOApp.Simple {
def mapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.map(i => i + 0.1)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def mapAsyncCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.mapAsync(2) { i =>
IO.println(s"inside evalMap, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def evalMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMap(i =>
IO.println(s"inside evalMap, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def evalMapChunkCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def twoEvalMapChunkCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside first evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside second evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
// same as mapAsyncCheck
def parEvalMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10) // Stream[IO, Chunk[Int]]
.unchunks // Stream[IO, Int]
.debugChunks(c => s"Chunk before anything: $c")
.parEvalMap(2) { i =>
IO.println(s"inside evalMap, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def chunkedParEvalMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.debugChunks(c => s"Chunk before anything: $c")
.parEvalMap(2) { c =>
IO.println(s"inside parEvalMap, element before sleep: $c") *>
IO.sleep(1.second) *>
IO.pure(c.map(_ + 0.1))
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def parEvalMapUnorderedCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.parEvalMapUnordered(2) { i =>
IO.println(s"inside evalMap, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
}
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def prefetchNBeforeMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.prefetchN(2)
.evalMapChunk(i =>
IO.println(s"inside evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def prefetchNAfterMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(2)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def prefetchNBetweenMapsCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside first evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(2)
.evalMapChunk(i =>
IO.println(s"inside second evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def chunkedPrefetchNBetweenMapsCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.debugChunks(c => s"Chunk before anything: $c")
.evalMap(c =>
IO.println(s"inside first evalMapChunk, element before sleep: $c") *>
IO.sleep(1.second) *>
IO.pure(c.map(_ + 0.1))
)
.prefetchN(2)
.evalMap(c =>
IO.println(s"inside second evalMapChunk, element before sleep: $c") *>
IO.sleep(1.second) *>
IO.pure(c.map(_ + 0.1))
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def mapPrefetchNMapMapCheck = Stream
.emits[IO, Int](0 until 100)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside first evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(1)
.evalMapChunk(i =>
IO.println(s"inside second evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside third evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def mapPrefetchNTenMapsCheck = Stream
.emits[IO, Int]((0 until 100).toList)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside first evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(10)
.evalMapChunk(i =>
IO.println(s"inside second evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside third evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside fourth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside fifth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside sixth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside seventh evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside eighth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside ninth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.evalMapChunk(i =>
IO.println(s"inside tenth evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def mapThenPrefetchNMapTwiceCheck = Stream
.emits[IO, Int]((0 until 100).toList)
.chunkN(10)
.unchunks
.debugChunks(c => s"Chunk before anything: $c")
.evalMapChunk(i =>
IO.println(s"inside first evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(1)
.evalMapChunk(i =>
IO.println(s"inside second evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.prefetchN(1)
.evalMapChunk(i =>
IO.println(s"inside third evalMapChunk, element before sleep: $i") *>
IO.sleep(1.second) *>
IO.pure(i + 0.1)
)
.debugChunks(c => s"Chunk after everything $c")
.compile
.drain
def meteredMaybe = Stream.emits[IO, Int]((0 until 100).toList)
def run = mapPrefetchNMapMapCheck
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment