Skip to content

Instantly share code, notes, and snippets.

@umbreak
Created November 21, 2017 15:04
Show Gist options
  • Save umbreak/6e385e85f2d50aca3247a8844e7b08bd to your computer and use it in GitHub Desktop.
Save umbreak/6e385e85f2d50aca3247a8844e7b08bd to your computer and use it in GitHub Desktop.
KG migration v0.7.6 and IAM migration v0.3.4
lazy val root = project
.in(file("."))
.settings(
name := "migration",
description := "Migration of data in the primary store",
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.14" % Test,
"com.typesafe.akka" %% "akka-testkit" % "2.5.4" % Test,
"io.circe" %% "circe-core" % "0.8.0" % Test,
"io.circe" %% "circe-parser" % "0.8.0" % Test,
"org.scalatest" %% "scalatest" % "3.0.4" % Test
),
licenses := Seq(("Apache 2.0", new URL("https://github.com/BlueBrain/nexus-kg/blob/master/LICENSE")))
)
import java.util.{Date, UUID}
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.testkit.TestKit
import com.datastax.driver.core._
import org.scalatest.{Matchers, WordSpecLike}
import io.circe.parser._
import MigrationSpec._
import akka.stream.scaladsl.Sink
import io.circe.Json
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import cats.syntax.either._
import scala.concurrent.{Await, Future}
class MigrationSpec extends TestKit(ActorSystem("MigrationSpec")) with WordSpecLike with Matchers with ScalaFutures {
"A Migration" should {
implicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).withCredentials("", "").build.connect()
implicit val mat = ActorMaterializer()
val logger = Logging(system, getClass)
def meta(uuid: UUID) =
parse(
s"""
|{
| "meta": {
| "author": {
| "id": "anonymous",
| "type": "Anonymous"
| },
| "instant": "${formattedTimeFrom(uuid)}"
| }
|}
""".stripMargin)
"migrate events from the primary store" in {
val countStmt = new SimpleStatement("SELECT count(*) as count FROM kg.messages")
def count(): Future[Long] =
CassandraSource(countStmt)
.map(row => row.getLong("count"))
.runWith(Sink.last)
def eventJson(row: Row): Either[String, Json] = {
val strEv = row.getString("event")
parse(strEv).leftMap(_ => s"Could not parse event '$strEv'")
}
def updatedEvent(row: Row, event: Json): Either[String, Json] = {
val uuid = row.getUUID("timestamp")
meta(uuid)
.map(metaJson => event deepMerge metaJson)
.leftMap(_ => s"Could not parse event meta for uuid '$uuid'")
}
def genStatement(row: Row, updatedEvent: Json): BoundStatement = {
val uuid = row.getUUID("timestamp")
val persId = row.getString("persistence_id")
val partNr = row.getLong("partition_nr")
val seqNr = row.getLong("sequence_nr")
val timeBucket = row.getString("timebucket")
session.prepare(s"UPDATE kg.messages SET event = textAsBlob(?) WHERE persistence_id='$persId' AND partition_nr=$partNr AND sequence_nr=$seqNr AND timeBucket='$timeBucket' AND timestamp=${uuid.toString}").bind(updatedEvent.noSpaces)
}
val stmt = new SimpleStatement("SELECT persistence_id,partition_nr,sequence_nr,timestamp,timebucket,blobAsText(event) as event FROM kg.messages").setFetchSize(20)
val futureUpdates = CassandraSource(stmt).runFold(0L) {
case (count, row) =>
val prepared = for {
evt <- eventJson(row)
upd <- updatedEvent(row, evt)
} yield genStatement(row, upd)
prepared match {
case Left(err) => logger.error(err)
case Right(statement) => session.execute(statement)
}
count + 1
}
val c = Await.result(count(), 1 minute)
val total = Await.result(futureUpdates, 1 minute)
println(s"=============> Count: $c")
println(s"=============> Total: $total")
c shouldEqual total
}
}
}
object MigrationSpec {
// https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java
private val NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L
private val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
private def getTimeFromUUID(uuid: UUID): Long = (uuid.timestamp - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000
def formattedTimeFrom(uuid: UUID): String = {
val time = getTimeFromUUID(uuid)
format.format(new Date(time))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment