Created
November 21, 2017 15:04
-
-
Save umbreak/6e385e85f2d50aca3247a8844e7b08bd to your computer and use it in GitHub Desktop.
KG migration v0.7.6 and IAM migration v0.3.4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lazy val root = project | |
.in(file(".")) | |
.settings( | |
name := "migration", | |
description := "Migration of data in the primary store", | |
libraryDependencies ++= Seq( | |
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.14" % Test, | |
"com.typesafe.akka" %% "akka-testkit" % "2.5.4" % Test, | |
"io.circe" %% "circe-core" % "0.8.0" % Test, | |
"io.circe" %% "circe-parser" % "0.8.0" % Test, | |
"org.scalatest" %% "scalatest" % "3.0.4" % Test | |
), | |
licenses := Seq(("Apache 2.0", new URL("https://github.com/BlueBrain/nexus-kg/blob/master/LICENSE"))) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Date, UUID} | |
import akka.actor.ActorSystem | |
import akka.event.Logging | |
import akka.stream.ActorMaterializer | |
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource | |
import akka.testkit.TestKit | |
import com.datastax.driver.core._ | |
import org.scalatest.{Matchers, WordSpecLike} | |
import io.circe.parser._ | |
import MigrationSpec._ | |
import akka.stream.scaladsl.Sink | |
import io.circe.Json | |
import org.scalatest.concurrent.ScalaFutures | |
import scala.concurrent.duration._ | |
import cats.syntax.either._ | |
import scala.concurrent.{Await, Future} | |
class MigrationSpec extends TestKit(ActorSystem("MigrationSpec")) with WordSpecLike with Matchers with ScalaFutures { | |
"A Migration" should { | |
implicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).withCredentials("", "").build.connect() | |
implicit val mat = ActorMaterializer() | |
val logger = Logging(system, getClass) | |
def meta(uuid: UUID) = | |
parse( | |
s""" | |
|{ | |
| "meta": { | |
| "author": { | |
| "id": "anonymous", | |
| "type": "Anonymous" | |
| }, | |
| "instant": "${formattedTimeFrom(uuid)}" | |
| } | |
|} | |
""".stripMargin) | |
"migrate events from the primary store" in { | |
val countStmt = new SimpleStatement("SELECT count(*) as count FROM kg.messages") | |
def count(): Future[Long] = | |
CassandraSource(countStmt) | |
.map(row => row.getLong("count")) | |
.runWith(Sink.last) | |
def eventJson(row: Row): Either[String, Json] = { | |
val strEv = row.getString("event") | |
parse(strEv).leftMap(_ => s"Could not parse event '$strEv'") | |
} | |
def updatedEvent(row: Row, event: Json): Either[String, Json] = { | |
val uuid = row.getUUID("timestamp") | |
meta(uuid) | |
.map(metaJson => event deepMerge metaJson) | |
.leftMap(_ => s"Could not parse event meta for uuid '$uuid'") | |
} | |
def genStatement(row: Row, updatedEvent: Json): BoundStatement = { | |
val uuid = row.getUUID("timestamp") | |
val persId = row.getString("persistence_id") | |
val partNr = row.getLong("partition_nr") | |
val seqNr = row.getLong("sequence_nr") | |
val timeBucket = row.getString("timebucket") | |
session.prepare(s"UPDATE kg.messages SET event = textAsBlob(?) WHERE persistence_id='$persId' AND partition_nr=$partNr AND sequence_nr=$seqNr AND timeBucket='$timeBucket' AND timestamp=${uuid.toString}").bind(updatedEvent.noSpaces) | |
} | |
val stmt = new SimpleStatement("SELECT persistence_id,partition_nr,sequence_nr,timestamp,timebucket,blobAsText(event) as event FROM kg.messages").setFetchSize(20) | |
val futureUpdates = CassandraSource(stmt).runFold(0L) { | |
case (count, row) => | |
val prepared = for { | |
evt <- eventJson(row) | |
upd <- updatedEvent(row, evt) | |
} yield genStatement(row, upd) | |
prepared match { | |
case Left(err) => logger.error(err) | |
case Right(statement) => session.execute(statement) | |
} | |
count + 1 | |
} | |
val c = Await.result(count(), 1 minute) | |
val total = Await.result(futureUpdates, 1 minute) | |
println(s"=============> Count: $c") | |
println(s"=============> Total: $total") | |
c shouldEqual total | |
} | |
} | |
} | |
object MigrationSpec { | |
// https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java | |
private val NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L | |
private val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") | |
private def getTimeFromUUID(uuid: UUID): Long = (uuid.timestamp - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000 | |
def formattedTimeFrom(uuid: UUID): String = { | |
val time = getTimeFromUUID(uuid) | |
format.format(new Date(time)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment