Last active
August 29, 2015 14:14
-
-
Save dragisak/05329c03c4bb67d547b3 to your computer and use it in GitHub Desktop.
PostgreSQL journal for Akka Persistence in Play! app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create tables for event sourcing. | |
# --- !Ups | |
CREATE TABLE events | |
( | |
event_id uuid NOT NULL, | |
seq bigint NOT NULL, | |
event_type character varying(12) NOT NULL, | |
user_id bigint NOT NULL, | |
org_id bigint NOT NULL, | |
payload json, | |
deleted boolean NOT NULL DEFAULT false, | |
created_at timestamp NOT NULL, | |
CONSTRAINT events_pk PRIMARY KEY (event_id, seq) | |
) | |
WITH ( | |
OIDS=FALSE | |
); | |
COMMENT ON COLUMN events.seq IS 'Event log sequence number. Unique and increasing for event_id.'; | |
COMMENT ON COLUMN events.event_type IS 'Defines what type of event this row is'; | |
COMMENT ON COLUMN events.user_id IS 'ID of the user performing the operation'; | |
COMMENT ON COLUMN events.org_id IS 'ID of the org this event belongs to'; | |
COMMENT ON COLUMN events.payload IS 'Variable part of event sourcing event. Optional.'; | |
COMMENT ON COLUMN events.deleted IS 'Flag that allows us mark events as deleted. Akka persistence will ignore deleted events.'; | |
# --- !Downs | |
DROP TABLE events; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create tables for event sourcing. | |
# --- !Ups | |
CREATE TABLE akka_system_events | |
( | |
persistence_id character varying(255) NOT NULL, | |
seq bigint NOT NULL, | |
payload bytea NOT NULL, | |
deleted boolean NOT NULL DEFAULT false, | |
created_at timestamp NOT NULL, | |
CONSTRAINT system_events_pk PRIMARY KEY (persistence_id, seq) | |
) | |
WITH ( | |
OIDS=FALSE | |
); | |
# --- !Downs | |
DROP TABLE akka_system_events; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package persistence | |
import java.sql.{PreparedStatement, ResultSet} | |
import java.util.UUID | |
import util.Time | |
import actor.EventActor | |
import akka.persistence._ | |
import akka.persistence.journal.SyncWriteJournal | |
import akka.serialization.SerializationExtension | |
import domain.events.{EventDraftCreated, Evt, TitleUpdated} | |
import org.joda.time.DateTime | |
import org.joda.time.format.DateTimeFormat | |
import org.postgresql.util.PGobject | |
import play.api.Logger | |
import play.api.Play.current | |
import play.api.db._ | |
import play.api.libs.concurrent.Akka | |
import play.api.libs.concurrent.Execution.Implicits.defaultContext | |
import play.api.libs.json._ | |
import scala.collection.immutable.Seq | |
import scala.concurrent.Future | |
import scala.language.postfixOps | |
import scala.util._ | |
class PostgresJournal extends SyncWriteJournal { | |
private object EventType { | |
final val EventDraftCreated = "CREATE" | |
final val TitleUpdated = "TITLE" | |
} | |
private lazy val serialization = SerializationExtension(Akka.system) | |
/** | |
* Akka system messages are private classes. All we can do is serialize them as byte arrays. | |
*/ | |
private lazy val defaultSerializer = serialization.findSerializerFor(classOf[AnyRef]) | |
private val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") | |
private def timestampObject(ts: DateTime): PGobject = { | |
val timestampObject = new PGobject() | |
timestampObject.setType("timestamp") | |
timestampObject.setValue(ts.toString(dateFormatter)) | |
timestampObject | |
} | |
override def writeMessages(messages: Seq[PersistentRepr]): Unit = if (messages.nonEmpty) { | |
DB.withTransaction { conn => | |
val pstmt = if (EventActor.isEventStateMessage(messages.head.persistenceId)) { | |
conn.prepareStatement(insertEventSql) | |
} else { | |
conn.prepareStatement(insertSystemEventSql) | |
} | |
messages.foreach { msg => | |
val p :PartialFunction[Any, Unit] = bindEventManagerEvt(msg, pstmt) orElse bindSystemEvent(msg, pstmt) | |
p(msg.payload) | |
pstmt.addBatch() | |
} | |
pstmt.executeBatch() | |
} | |
} | |
private val insertEventSql = "INSERT INTO events(event_id, seq, event_type, org_id, payload, user_id, created_at) VALUES (?,?,?,?,?,?,?)" | |
/** | |
* This is where you would put code to persist Event Manager event massages to `events` table. | |
*/ | |
private def bindEventManagerEvt(msg: PersistentRepr, pstmt: PreparedStatement) :PartialFunction[Any, Unit] = { | |
case evt: TitleUpdated => | |
pstmt.setObject(1, evt.eventId) | |
pstmt.setLong(2, msg.sequenceNr) | |
pstmt.setString(3, EventType.TitleUpdated) | |
pstmt.setLong(4, evt.orgId) | |
val jsonObject = new PGobject() | |
jsonObject.setType("json") | |
jsonObject.setValue( s"""{"title":"${evt.title}"}""") | |
pstmt.setObject(5, jsonObject) | |
pstmt.setLong(6, evt.userId) | |
pstmt.setObject(7, timestampObject(evt.createdAt)) | |
case evt: EventDraftCreated => | |
pstmt.setObject(1, evt.eventId) | |
pstmt.setLong(2, msg.sequenceNr) | |
pstmt.setString(3, EventType.EventDraftCreated) | |
pstmt.setLong(4, evt.orgId) | |
pstmt.setObject(5, null) | |
pstmt.setLong(6, evt.userId) | |
pstmt.setObject(7, timestampObject(evt.createdAt)) | |
} | |
private val insertSystemEventSql = "INSERT INTO akka_system_events(persistence_id, seq, payload, created_at) VALUES (?,?,?,?)" | |
/** | |
* Akka sharding messages are serialized as byte array. | |
*/ | |
private def bindSystemEvent(msg: PersistentRepr, pstmt: PreparedStatement) :PartialFunction[Any, Unit] = { | |
case systemMessage: AnyRef => | |
val bytes = defaultSerializer.toBinary(systemMessage) | |
pstmt.setString(1, msg.persistenceId) | |
pstmt.setLong(2, msg.sequenceNr) | |
pstmt.setBytes(3, bytes) | |
pstmt.setObject(4, timestampObject(Time.now)) | |
case x => Logger.warn(s"Unhandled payload $x will not be persisted") | |
} | |
private val deleteEventsSql = "DELETE FROM events WHERE event_id = ? AND seq <= ?" | |
private val markEventsAsDeletedSql = "UPDATE events SET deleted = true WHERE event_id = ? AND seq <= ?" | |
private val deleteSystemEventsSql = "DELETE FROM akka_system_events WHERE persistence_id = ? AND seq <= ?" | |
private val markSystemEventsAsDeletedSql = "UPDATE akka_system_events SET deleted = true WHERE persistence_id = ? AND seq <= ?" | |
override def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = { | |
DB.withTransaction { conn => | |
if (EventActor.isEventStateMessage(persistenceId)) { | |
val eventId = EventActor.extractId(persistenceId) | |
val sql = if (permanent) deleteEventsSql else markEventsAsDeletedSql | |
val pstmt = conn.prepareStatement(sql) | |
pstmt.setObject(1, eventId) | |
pstmt.setLong(2, toSequenceNr) | |
pstmt.executeUpdate() | |
} else { | |
val sql = if (permanent) deleteSystemEventsSql else markSystemEventsAsDeletedSql | |
val pstmt = conn.prepareStatement(sql) | |
pstmt.setString(1, persistenceId) | |
pstmt.setLong(2, toSequenceNr) | |
pstmt.executeUpdate() | |
} | |
} | |
} | |
@deprecated("deleteMessages will be removed.", since = "2.3.4") | |
override def deleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Unit = () | |
@deprecated("writeConfirmations will be removed, since Channels will be removed.", since = "2.3.4") | |
override def writeConfirmations(confirmations: Seq[PersistentConfirmation]): Unit = () | |
private val selectMaxSeqSql = "SELECT max(seq) FROM events WHERE event_id = ? AND seq >= ?" | |
private val selectSystemMaxSeqSql = "SELECT max(seq) FROM akka_system_events WHERE persistence_id = ? AND seq >= ?" | |
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future { | |
DB.withConnection { conn => | |
val pstmt = if (EventActor.isEventStateMessage(persistenceId)) { | |
val eventId = EventActor.extractId(persistenceId) | |
val stmt = conn.prepareStatement(selectMaxSeqSql) | |
stmt.setObject(1, eventId) | |
stmt | |
} else { | |
val stmt = conn.prepareStatement(selectSystemMaxSeqSql) | |
stmt.setString(1, persistenceId) | |
stmt | |
} | |
pstmt.setLong(2, fromSequenceNr) | |
val resultSet = pstmt.executeQuery() | |
if (resultSet.next()) { | |
val maxSeq = resultSet.getLong(1) | |
if(resultSet.wasNull()) { | |
fromSequenceNr | |
} else { | |
maxSeq | |
} | |
} else { | |
fromSequenceNr | |
} | |
} | |
} | |
private val selectEventSql = "SELECT event_id, seq, event_type, org_id, payload, user_id, created_at, deleted FROM events WHERE event_id = ? AND seq >= ? AND seq <= ? ORDER BY seq LIMIT ?" | |
private val selectSystemEventSql = "SELECT persistence_id, seq, payload, created_at, deleted FROM akka_system_events WHERE persistence_id = ? AND seq >= ? AND seq <= ? ORDER BY seq LIMIT ?" | |
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = Future { | |
DB.withConnection { | |
conn => | |
if (EventActor.isEventStateMessage(persistenceId)) { | |
val eventId = EventActor.extractId(persistenceId) | |
val pstmt = conn.prepareStatement(selectEventSql) | |
pstmt.setObject(1, eventId) | |
pstmt.setLong(2, fromSequenceNr) | |
pstmt.setLong(3, toSequenceNr) | |
pstmt.setLong(4, max) | |
replayResultSet(persistenceId, pstmt.executeQuery(), eventManagerEvent)(replayCallback) | |
} else { | |
val pstmt = conn.prepareStatement(selectSystemEventSql) | |
pstmt.setString(1, persistenceId) | |
pstmt.setLong(2, fromSequenceNr) | |
pstmt.setLong(3, toSequenceNr) | |
pstmt.setLong(4, max) | |
replayResultSet(persistenceId, pstmt.executeQuery(), systemEvent)(replayCallback) | |
} | |
} | |
} | |
private def replayResultSet(persistenceId: String, resultSet: ResultSet, getPayloadFrom: ResultSet => Try[AnyRef])(replayCallback: (PersistentRepr) => Unit) = { | |
while (resultSet.next()) { | |
getPayloadFrom(resultSet) match { | |
case Success(pl) => | |
val seqNr = resultSet.getLong(2) | |
val isDeleted = resultSet.getBoolean(5) | |
replayCallback(PersistentRepr( | |
payload = pl, | |
sequenceNr = seqNr, | |
deleted = isDeleted | |
)) | |
case Failure(err) => | |
Logger.error(s"Failed to deserialize payload from $persistenceId Skipping ...", err) | |
} | |
} | |
} | |
/** | |
* This is where you would put code that extracts Event Manager events from database result set. | |
*/ | |
private val eventManagerEvent: ResultSet => Try[Evt] = resultSet => { | |
val eventType = resultSet.getString(3) | |
eventType match { | |
case EventType.TitleUpdated => | |
val json = Json.parse(resultSet.getString(5)) | |
Success( | |
TitleUpdated( | |
eventId = resultSet.getObject(1).asInstanceOf[UUID], | |
orgId = resultSet.getLong(4), | |
userId = resultSet.getLong(6), | |
title = (json \ "title").as[String], | |
createdAt = new DateTime(resultSet.getObject(7)) | |
) | |
) | |
case EventType.EventDraftCreated => | |
Success( | |
EventDraftCreated( | |
eventId = resultSet.getObject(1).asInstanceOf[UUID], | |
orgId = resultSet.getLong(4), | |
userId = resultSet.getLong(6), | |
createdAt = new DateTime(resultSet.getObject(7)) | |
) | |
) | |
case unknown => Failure( | |
new IllegalArgumentException(s"Unknown event type $unknown") | |
) | |
} | |
} | |
/** | |
* This is de-serialization for Akka sharding messages. | |
*/ | |
private val systemEvent: ResultSet => Try[AnyRef] = resultSet => { | |
val bytes = resultSet.getBytes(3) | |
Try(defaultSerializer.fromBinary(bytes)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment