Skip to content

Instantly share code, notes, and snippets.

@umbreak
Last active December 21, 2017 15:15
Show Gist options
  • Save umbreak/99bb693039b21fc1dea6a113ea35022e to your computer and use it in GitHub Desktop.
Save umbreak/99bb693039b21fc1dea6a113ea35022e to your computer and use it in GitHub Desktop.
KG migration v0.8
lazy val root = project
.in(file("."))
.settings(
name := "migration",
description := "Migration of data in the primary store",
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.14" % Test,
"com.typesafe.akka" %% "akka-testkit" % "2.5.4" % Test,
"org.scalatest" %% "scalatest" % "3.0.4" % Test
),
licenses := Seq(("Apache 2.0", new URL("https://github.com/BlueBrain/nexus-kg/blob/master/LICENSE")))
)
import java.util.regex.Pattern
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.Sink
import akka.testkit.TestKit
import com.datastax.driver.core._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class MigrationSpec extends TestKit(ActorSystem("MigrationSpec")) with WordSpecLike with Matchers with ScalaFutures {
"A Migration" should {
implicit val session = Cluster.builder
.addContactPoint("host")
.withPort(9042)
.withCredentials("", "")
.build
.connect()
implicit val mat = ActorMaterializer()
"migrate events from the primary store" in {
val countStmt = new SimpleStatement("SELECT count(*) as count FROM kg.messages")
def count(): Future[Long] =
CassandraSource(countStmt)
.map(row => row.getLong("count"))
.runWith(Sink.last)
def isCandidate(event: String): Boolean =
event.contains(""""size"""") && event.contains(""""contentType"""") && event.contains(""""alg"""")
def genStatement(row: Row): Option[BoundStatement] = {
val uuid = row.getUUID("timestamp")
val persId = row.getString("persistence_id")
val partNr = row.getLong("partition_nr")
val seqNr = row.getLong("sequence_nr")
val timeBucket = row.getString("timebucket")
val str = row.getString("event")
if (isCandidate(str)) {
val event = str
.replaceAll(Pattern.quote(""""size""""), """"contentSize"""")
.replaceAll(Pattern.quote(""""contentType""""), """"mediaType"""")
.replaceAll(Pattern.quote(""""alg""""), """"algorithm"""")
println(s"Changing event with persistence_id '$persId'")
Some(session
.prepare(
s"UPDATE kg.messages SET event = textAsBlob(?) WHERE persistence_id='$persId' AND partition_nr=$partNr AND sequence_nr=$seqNr AND timeBucket='$timeBucket' AND timestamp=${uuid.toString}")
.bind(event))
} else None
}
val stmt = new SimpleStatement(
"SELECT persistence_id,partition_nr,sequence_nr,timestamp,timebucket,blobAsText(event) as event FROM kg.messages")
.setFetchSize(2000)
val futureUpdates = CassandraSource(stmt).runFold(0L) {
case (count, row) =>
genStatement(row) match {
case Some(result) =>
session.execute(result)
count + 1
case _ => count
}
}
val c = Await.result(count(), 1 minute)
val total = Await.result(futureUpdates, 200 minute)
println(s"=============> Events modified: $total")
println(s"=============> Total events: $c")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment