Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active April 25, 2017 14:14
Show Gist options
  • Save atamborrino/57f9095cb30d70e90d8507602a14b6bd to your computer and use it in GitHub Desktop.
Save atamborrino/57f9095cb30d70e90d8507602a14b6bd to your computer and use it in GitHub Desktop.
Parallel SCAN per node on a Redis Cluster as an Akka Stream Source
import akka.NotUsed
import akka.stream.scaladsl.Source
import redis.clients.jedis.{Jedis, JedisCluster, ScanParams}
import scala.collection.JavaConverters._
import scala.concurrent.Future
class RedisStream(maxNodeParallelism: Int)
(implicit jedisCluster: JedisCluster,
blockingEC: RedisBlockingEC) {
implicit val ec = blockingEC.value
def scan(scanParams: ScanParams): Source[String, NotUsed] = {
lazy val futMasterNodes = Future(jedisCluster.getClusterNodes.asScala.toMap)
.flatMap { nodes =>
Future.traverse(nodes) { case (_, node) =>
Future {
ScalaCloseable.withResource(node.getResource()) { conn =>
val info = conn.info("replication")
if (info.contains("role:master")) Some(node)
else None
}
}
}
}
.map(_.flatten)
Source.lazily(() => Source.fromFuture(futMasterNodes))
.mapConcat(_.toList)
.flatMapMerge(maxNodeParallelism, { node =>
Source.unfoldAsync[Long, List[String]](0) { cursor =>
if (cursor == -1) Future.successful(None)
else {
Future {
ScalaCloseable.withResource(node.getResource()) { conn =>
conn.scan(cursor.toString, scanParams)
}
}
.map { scanResult =>
val keys = scanResult.getResult.asScala.toList
val scanResultCursor = scanResult.getStringCursor.toLong
val nextCursor = if (scanResultCursor == 0) -1 else scanResultCursor
Some(nextCursor -> keys)
}
}
}
})
.mapConcat(_.toList)
.mapMaterializedValue(_ => NotUsed)
}
}
object ScalaCloseable {
def withResource[A >: Null <: AutoCloseable, B](a: => A)(f: A => B) = {
var aOrNull: A = null
try {
aOrNull = a
f(aOrNull)
} finally {
if (aOrNull != null) aOrNull.close()
}
}
}
@vdebergue
Copy link

Améliorer la gestion de la connection:

Future {
  val conn = node.getResource
  val res = conn.scan(cursor.toString, scanParams))
  conn.close()
  res
}

@atamborrino
Copy link
Author

Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment