Skip to content

Instantly share code, notes, and snippets.

@t3hnar
Last active May 17, 2017 06:30
Show Gist options
  • Save t3hnar/cd9cb6757d6c322398764b5feaae70c1 to your computer and use it in GitHub Desktop.
Save t3hnar/cd9cb6757d6c322398764b5feaae70c1 to your computer and use it in GitHub Desktop.
package akka_streams_test
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.concurrent.Future
object ConsistentHashingParallelism extends App {
implicit val system = ActorSystem("ConsistentHashingParallelism")
implicit val materializer = ActorMaterializer()
import system.dispatcher
case class Elem(id: Int, n: Int)
val parallelism = 10
val source = Source.queue[Elem](Int.MaxValue, OverflowStrategy.fail)
val sink = Sink.seq[Elem]
val flow = Flow[Elem]
.groupBy(parallelism, x => math.abs(x.id.hashCode() % parallelism))
.map { x => Thread.sleep(1); x }
.mergeSubstreams
val (queue, seq) = source.via(flow).toMat(sink)(Keep.both).run()
val futures = for {
id <- 1 to 10
} yield Future {
Thread.sleep(1)
for {
n <- 1 to 1000
} queue.offer(Elem(id, n))
}
Future.sequence(futures) onSuccess { case _ => queue.complete() }
for {
seq <- seq
} {
for {
(id, elems) <- seq.groupBy(_.id)
} {
val ns = elems map { _.n }
println(s"$id: ${ ns.sorted == ns }")
}
system.terminate()
}
}
@t3hnar
Copy link
Author

t3hnar commented May 17, 2017

To parallelize computation and make sure we preserve order of elements with same id

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment