Skip to content

Instantly share code, notes, and snippets.

@amitayh
Last active April 27, 2022 06:35
Show Gist options
  • Save amitayh/e4ad769652efee4c6dc5 to your computer and use it in GitHub Desktop.
Save amitayh/e4ad769652efee4c6dc5 to your computer and use it in GitHub Desktop.
package com.wixpress.quotes.common
import java.util.UUID
object CQRS {
////////////////////////////////////////////////////////////////////////////////
// Example application
////////////////////////////////////////////////////////////////////////////////
def main(args: Array[String]) {
val commandBus = new CommandBus[QuoteCommand]
val eventBus = new EventBus[QuoteEvent]
val eventStore = new InMemoryEventStore[QuoteEvent]
val repository = new QuoteRepository(eventStore)
val handler = new QuoteCommandHandler(repository, eventBus)
val listView = new ListViewDenormalizer
val mailer = new Mailer(commandBus)
commandBus.subscribe(handler)
eventBus.subscribe(listView)
eventBus.subscribe(mailer)
val quoteId1 = UUID.randomUUID.toString
commandBus.publish(CreateNewQuote(quoteId1))
commandBus.publish(AddItemToQuote(quoteId1, QuoteItem("something", 5.5)))
commandBus.publish(AddItemToQuote(quoteId1, QuoteItem("something else", 6)))
commandBus.publish(SetQuoteDiscount(quoteId1, 10))
val quoteId2 = UUID.randomUUID.toString
commandBus.publish(CreateNewQuote(quoteId2))
commandBus.publish(AddItemToQuote(quoteId2, QuoteItem("foobar", 12.3)))
commandBus.publish(SendQuoteToClient(quoteId2, "client@company.com"))
println("--------------------")
listView.getAll.foreach(println)
}
////////////////////////////////////////////////////////////////////////////////
// Types
////////////////////////////////////////////////////////////////////////////////
type AggregateId = String
type AggregateVersion = Long
type EventStream[E] = List[E]
////////////////////////////////////////////////////////////////////////////////
// Interfaces
////////////////////////////////////////////////////////////////////////////////
trait AggregateRepository[T] {
def load(aggregateId: AggregateId): Option[T]
def save(aggregate: T): Unit
}
trait EventHandler[E, T] {
def handle: PartialFunction[E, T]
}
trait CommandHandler[C] {
def handle: PartialFunction[C, Unit]
}
trait AggregateRoot {
def id: AggregateId
def version: AggregateVersion
}
trait EventStore[E] {
def getEventsForAggregate(id: AggregateId): EventStream[E]
def appendEvents(id: AggregateId, events: EventStream[E]): Unit
}
trait MessageBus[M, H] {
def subscribe(handler: H)
def publish(message: M): Unit
def publish(messages: EventStream[M]): Unit = messages.foreach(publish)
}
////////////////////////////////////////////////////////////////////////////////
// Event sourcing
////////////////////////////////////////////////////////////////////////////////
trait EventSourcedAggregateRoot[E, T] extends AggregateRoot with EventHandler[E, EventSourcedAggregateRoot[E, T]]
trait EventSourcedAggregateRepository[E, T] extends AggregateRepository[EventSourcedAggregateRoot[E, T]] {
type AggregateType = EventSourcedAggregateRoot[E, T]
def eventStore: EventStore[E]
def empty: AggregateType
private var uncommittedEvents: EventStream[E] = Nil
override def load(aggregateId: AggregateId): Option[AggregateType] = {
eventStore.getEventsForAggregate(aggregateId) match {
case Nil => None
case events =>
val aggregate = events.foldLeft(empty)((prev, event) => prev.handle(event))
Some(aggregate)
}
}
override def save(aggregate: AggregateType): Unit = {
eventStore.appendEvents(aggregate.id, uncommittedEvents)
uncommittedEvents = Nil
}
def appendEvents(events: EventStream[E]): Unit = {
uncommittedEvents ++= events
}
}
////////////////////////////////////////////////////////////////////////////////
// Message busses
////////////////////////////////////////////////////////////////////////////////
class EventBus[E] extends MessageBus[E, EventHandler[E, _]] {
type HandlerType = EventHandler[E, _]
private var handlers: List[HandlerType] = Nil
override def subscribe(handler: HandlerType): Unit = {
handlers = handler :: handlers
}
override def publish(event: E): Unit = {
handlers.filter(_.handle.isDefinedAt(event)).foreach(_.handle(event))
}
}
class CommandBus[C] extends MessageBus[C, CommandHandler[C]] {
type HandlerType = CommandHandler[C]
private var handlers: List[HandlerType] = Nil
override def subscribe(handler: HandlerType): Unit = {
handlers = handler :: handlers
}
override def publish(command: C): Unit = {
handlers.find(_.handle.isDefinedAt(command)).foreach(_.handle(command))
}
}
class InMemoryEventStore[E] extends EventStore[E] {
private var events: Map[AggregateId, EventStream[E]] = Map.empty
override def getEventsForAggregate(aggregateId: AggregateId): EventStream[E] = {
events.getOrElse(aggregateId, List())
}
override def appendEvents(aggregateId: AggregateId, newEvents: EventStream[E]): Unit = {
// TODO: concurrent modification validation
val oldEvents: EventStream[E] = getEventsForAggregate(aggregateId)
events = events.updated(aggregateId, oldEvents ++ newEvents)
}
}
////////////////////////////////////////////////////////////////////////////////
// Events
////////////////////////////////////////////////////////////////////////////////
sealed trait QuoteCommand {
def quoteId: AggregateId
}
case class CreateNewQuote(quoteId: AggregateId) extends QuoteCommand
case class AddItemToQuote(quoteId: AggregateId, item: QuoteItem) extends QuoteCommand
case class SetQuoteDiscount(quoteId: AggregateId, discount: Double) extends QuoteCommand
case class SendQuoteToClient(quoteId: AggregateId, email: String) extends QuoteCommand
case class MarkQuoteAsSent(quoteId: AggregateId) extends QuoteCommand
////////////////////////////////////////////////////////////////////////////////
// Commands
////////////////////////////////////////////////////////////////////////////////
sealed trait QuoteEvent {
def quoteId: AggregateId
}
case class NewQuoteCreated(quoteId: AggregateId) extends QuoteEvent
case class ItemAddedToQuote(quoteId: AggregateId, item: QuoteItem) extends QuoteEvent
case class QuoteDiscountSet(quoteId: AggregateId, discount: Double) extends QuoteEvent
case class QuoteSentToClient(quoteId: AggregateId, email: String) extends QuoteEvent
case class QuoteMarkedAsSent(quoteId: AggregateId) extends QuoteEvent
////////////////////////////////////////////////////////////////////////////////
// Aggregates
////////////////////////////////////////////////////////////////////////////////
case class Quote(id: AggregateId, version: AggregateVersion, items: List[QuoteItem], discount: Double)
extends EventSourcedAggregateRoot[QuoteEvent, Quote] {
override def handle: PartialFunction[QuoteEvent, Quote] = {
case NewQuoteCreated(quoteId) => copy(id = quoteId, version = version + 1)
case ItemAddedToQuote(_, item) => copy(items = items :+ item, version = version + 1)
case QuoteDiscountSet(_, rate) => copy(discount = rate, version = version + 1)
case _ => copy(version = version + 1)
}
}
object Quote {
def empty: Quote = Quote("", 0, Nil, 0)
}
case class QuoteItem(name: String, price: Double)
////////////////////////////////////////////////////////////////////////////////
// Wiring
////////////////////////////////////////////////////////////////////////////////
class QuoteCommandHandler(repository: EventSourcedAggregateRepository[QuoteEvent, Quote],
eventBus: EventBus[QuoteEvent])
extends CommandHandler[QuoteCommand] {
override def handle = new PartialFunction[QuoteCommand, Unit] {
override def isDefinedAt(command: QuoteCommand): Boolean = {
getEvent.isDefinedAt(command)
}
override def apply(command: QuoteCommand): Unit = {
val quote = repository.load(command.quoteId).getOrElse(repository.empty)
val event = getEvent(command)
val updatedQuote = quote.handle(event)
repository.appendEvents(List(event))
repository.save(updatedQuote)
eventBus.publish(event)
}
}
def getEvent: PartialFunction[QuoteCommand, QuoteEvent] = {
case CreateNewQuote(id) => NewQuoteCreated(id)
case AddItemToQuote(id, item) => ItemAddedToQuote(id, item)
case SetQuoteDiscount(id, discount) => QuoteDiscountSet(id, discount)
case SendQuoteToClient(id, email) => QuoteSentToClient(id, email)
case MarkQuoteAsSent(id) => QuoteMarkedAsSent(id)
}
}
class QuoteRepository(override val eventStore: EventStore[QuoteEvent])
extends EventSourcedAggregateRepository[QuoteEvent, Quote] {
override def empty: AggregateType = Quote.empty
}
case class QuoteSummaryDto(quoteId: AggregateId, total: Double = 0, status: String = "Draft")
class ListViewDenormalizer extends EventHandler[QuoteEvent, Unit] {
private var list: Map[AggregateId, QuoteSummaryDto] = Map.empty
override def handle: PartialFunction[QuoteEvent, Unit] = {
case NewQuoteCreated(quoteId) =>
list += (quoteId -> QuoteSummaryDto(quoteId))
case ItemAddedToQuote(quoteId, item) =>
val updated = list.get(quoteId).map(old => old.copy(total = old.total + item.price))
list += (quoteId -> updated.get)
case QuoteMarkedAsSent(quoteId) =>
val updated = list.get(quoteId).map(old => old.copy(status = "Sent"))
list += (quoteId -> updated.get)
}
def getAll = list.values
}
class Mailer(commandBus: CommandBus[QuoteCommand]) extends EventHandler[QuoteEvent, Unit] {
override def handle: PartialFunction[QuoteEvent, Unit] = {
case QuoteSentToClient(quoteId, email) =>
println(s"Mailing quote $quoteId to $email...")
commandBus.publish(MarkQuoteAsSent(quoteId))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment