Last active
December 21, 2017 15:15
-
-
Save umbreak/99bb693039b21fc1dea6a113ea35022e to your computer and use it in GitHub Desktop.
KG migration v0.8
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lazy val root = project | |
.in(file(".")) | |
.settings( | |
name := "migration", | |
description := "Migration of data in the primary store", | |
libraryDependencies ++= Seq( | |
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.14" % Test, | |
"com.typesafe.akka" %% "akka-testkit" % "2.5.4" % Test, | |
"org.scalatest" %% "scalatest" % "3.0.4" % Test | |
), | |
licenses := Seq(("Apache 2.0", new URL("https://github.com/BlueBrain/nexus-kg/blob/master/LICENSE"))) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.regex.Pattern | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource | |
import akka.stream.scaladsl.Sink | |
import akka.testkit.TestKit | |
import com.datastax.driver.core._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.{Matchers, WordSpecLike} | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, Future} | |
class MigrationSpec extends TestKit(ActorSystem("MigrationSpec")) with WordSpecLike with Matchers with ScalaFutures { | |
"A Migration" should { | |
implicit val session = Cluster.builder | |
.addContactPoint("host") | |
.withPort(9042) | |
.withCredentials("", "") | |
.build | |
.connect() | |
implicit val mat = ActorMaterializer() | |
"migrate events from the primary store" in { | |
val countStmt = new SimpleStatement("SELECT count(*) as count FROM kg.messages") | |
def count(): Future[Long] = | |
CassandraSource(countStmt) | |
.map(row => row.getLong("count")) | |
.runWith(Sink.last) | |
def isCandidate(event: String): Boolean = | |
event.contains(""""size"""") && event.contains(""""contentType"""") && event.contains(""""alg"""") | |
def genStatement(row: Row): Option[BoundStatement] = { | |
val uuid = row.getUUID("timestamp") | |
val persId = row.getString("persistence_id") | |
val partNr = row.getLong("partition_nr") | |
val seqNr = row.getLong("sequence_nr") | |
val timeBucket = row.getString("timebucket") | |
val str = row.getString("event") | |
if (isCandidate(str)) { | |
val event = str | |
.replaceAll(Pattern.quote(""""size""""), """"contentSize"""") | |
.replaceAll(Pattern.quote(""""contentType""""), """"mediaType"""") | |
.replaceAll(Pattern.quote(""""alg""""), """"algorithm"""") | |
println(s"Changing event with persistence_id '$persId'") | |
Some(session | |
.prepare( | |
s"UPDATE kg.messages SET event = textAsBlob(?) WHERE persistence_id='$persId' AND partition_nr=$partNr AND sequence_nr=$seqNr AND timeBucket='$timeBucket' AND timestamp=${uuid.toString}") | |
.bind(event)) | |
} else None | |
} | |
val stmt = new SimpleStatement( | |
"SELECT persistence_id,partition_nr,sequence_nr,timestamp,timebucket,blobAsText(event) as event FROM kg.messages") | |
.setFetchSize(2000) | |
val futureUpdates = CassandraSource(stmt).runFold(0L) { | |
case (count, row) => | |
genStatement(row) match { | |
case Some(result) => | |
session.execute(result) | |
count + 1 | |
case _ => count | |
} | |
} | |
val c = Await.result(count(), 1 minute) | |
val total = Await.result(futureUpdates, 200 minute) | |
println(s"=============> Events modified: $total") | |
println(s"=============> Total events: $c") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment