Skip to content

Instantly share code, notes, and snippets.

@SlavikBaranov
Created June 16, 2016 20:33
Show Gist options
  • Save SlavikBaranov/fcd11a575b564c0179b5d97509a83bf5 to your computer and use it in GitHub Desktop.
Save SlavikBaranov/fcd11a575b564c0179b5d97509a83bf5 to your computer and use it in GitHub Desktop.
// Requires "com.twitter" %% "util-core" % "6.34.0"
import com.twitter.concurrent.AsyncStream
import com.twitter.util.{Await, Future}
object TestStream extends App {
def foo(read: () => Future[String], write: String => Future[Unit]): Future[Unit] = {
def stream: AsyncStream[String] = AsyncStream.fromFuture(read()).flatMap(_ +:: stream)
stream.scanLeft(0)(_ + _.toInt)
.takeWhile(_ <= 1000000)
.foreachF(i => write(i.toString))
}
val t = System.nanoTime()
val f = foo(
read = () => Future.value("1"),
write = {
case "100000" =>
println("100k!")
Future.value(())
case "500000" =>
println("500k!")
Future.value(())
case _ =>
Future.value(())
}
)
Await.result(f)
val t2 = (System.nanoTime() - t) / 1000000000d
println(s"Execution time: ${t2}s")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment