Last active
December 12, 2023 01:41
-
-
Save SpareShade/d38ecc0202a87ec7439a00cf0d0f2577 to your computer and use it in GitHub Desktop.
GetUp - Event Sourcing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package eventsourcing provides an implementation of the caption repository using event sourcing. | |
package eventsourcing | |
import ( | |
"context" | |
"errors" | |
pkg_domain "bleyk.org/pkg/domain" | |
"bleyk.org/pkg/metadata" | |
"bleyk.org/services/artwork/internal/domain" | |
domain_caption "bleyk.org/services/artwork/internal/domain/caption" | |
) | |
// captionRepository is an implementation of the domain_caption.Repository interface using event sourcing. | |
type captionRepository struct { | |
eventStore pkg_domain.EventStore | |
} | |
// NewCaptionRepository creates a new caption repository with the given event store. | |
func NewCaptionRepository(eventStore pkg_domain.EventStore) domain_caption.Repository { | |
return &captionRepository{ | |
eventStore: eventStore, | |
} | |
} | |
// Save saves changes made to the caption aggregate to the event store within the provided session. | |
func (r *captionRepository) Save(ctx context.Context, artwork domain_caption.CaptionAggregate, sess pkg_domain.EventStoreSession) error { | |
// Check if there are changes to be saved. | |
if len(artwork.Changes()) > 0 { | |
var meta metadata.EventMessageMetadata | |
// Iterate through each change in the aggregate's changes. | |
for _, ev := range artwork.Changes() { | |
// Retrieve metadata from the context. | |
meta = metadata.EventMessageMetadataFromContext(ctx) | |
// If metadata is present, associate it with the event. | |
if !meta.IsEmpty() { | |
ev.WithMetadata(&meta) | |
} | |
} | |
} | |
// Save changes to the event store within the provided session. | |
return sess.Save(artwork.Changes()) | |
} | |
// Get retrieves the caption aggregate from the event store based on the provided stream ID. | |
func (r *captionRepository) Get(ctx context.Context, streamID domain.ArtworkID) (domain_caption.CaptionAggregate, error) { | |
var agg domain_caption.CaptionAggregate | |
// Retrieve events from the event store based on stream ID and stream name. | |
events, err := r.eventStore.GetStream(ctx, streamID.Value(), domain_caption.StreamName) | |
if err != nil { | |
// Handle errors, checking if the stream is not found. | |
switch { | |
case errors.Is(err, pkg_domain.ErrEventStoreStreamNotFound): | |
// do we need to modify this? | |
} | |
return agg, err | |
} | |
// Reconstruct the caption aggregate from the retrieved events. | |
return domain_caption.FromHistory(ctx, events) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* pkg/domain - EventStore interface*/ | |
package domain | |
import ( | |
"context" | |
) | |
// EventStore represents an interface for an event store responsible for managing domain events. | |
type EventStore interface { | |
// NewStoreSession creates a new event store session that holds all events to be committed in a single session. | |
NewStoreSession() EventStoreSession | |
// Get retrieves a specific event by ID. | |
Get(ctx context.Context, id string) (EventMessage, error) | |
// FindAll retrieves all stored events. | |
FindAll(ctx context.Context) ([]EventMessage, error) | |
// GetStream retrieves all events for a specific stream (aggregate) by ID and name. | |
GetStream(ctx context.Context, streamID string, streamName string) ([]EventMessage, error) | |
// GetStreamEventsByType retrieves events for a specific stream by type. | |
GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]EventMessage, error) | |
// Commit persists the "stored events" in the session. | |
Commit(ctx context.Context, session EventStoreSession) error | |
// SubscribeTransactionalEventHandlers subscribes transactional event handlers for processing events. | |
SubscribeTransactionalEventHandlers(...TransactionalEventHandler) error | |
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters. | |
SubscribeIntegrationEventMessageAdapters(adapters ...IntegrationEventMessageAdapter) error | |
} | |
// EventStoreSession represents a session for saving events in aggregates. | |
type EventStoreSession interface { | |
ID() string // ID returns the session ID. | |
Save(events []EventMessage) error // Save saves the provided events in the session. | |
Get() []EventMessage // Get retrieves all events in the session. | |
IsEmpty() bool // IsEmpty checks if the session is empty. | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package postgres | |
// EventStore with transactional support ensures data integrity within the bounded context. | |
// It facilitates the persistence of projections, read models, etc., within a transaction, | |
// while the aggregate events are being stored in the events store. | |
// It is crucial to use event handlers specific to the bounded context. For instance, when | |
// provisioning a new user, ensure that user aggregate events are processed by the relevant | |
// handlers before committing them to the event store via the user repository. | |
// Any failure in either the event handling or persistence of projections will result in the | |
// failure of both, emphasizing the importance of maintaining data consistency. | |
import ( | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/jackc/pgx/v4" | |
"github.com/jackc/pgx/v4/pgxpool" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"google.golang.org/protobuf/proto" | |
domain "bleyk.org/pkg/domain" | |
apperrors "bleyk.org/pkg/errors" | |
"bleyk.org/pkg/integration" | |
"bleyk.org/pkg/logger" | |
"bleyk.org/pkg/metadata" | |
"bleyk.org/pkg/protobuf" | |
) | |
// eventStore is a PostgreSQL implementation of the EventStore interface. | |
type eventStore struct { | |
domainEventHandlers []domain.TransactionalEventHandler | |
integrationEventsAdapters []domain.IntegrationEventMessageAdapter | |
integrationEventsStore integration.EventStore | |
queuedIntegrationEventMessages []integration.EventMessage | |
db *pgxpool.Pool | |
logger logger.Logger | |
} | |
// eventMsgDB is a representation of the event in the database. | |
type eventMsgDB struct { | |
id int64 | |
eventID string | |
// userID string | |
streamID string | |
streamName string | |
streamSeqNo int64 | |
eventType string | |
payload interface{} | |
binaryPayload []byte | |
metadata []byte | |
eventTime time.Time | |
} | |
// NewEventStore creates a new PostgreSQL event store. | |
func NewEventStore( | |
integrationEventsStore integration.EventStore, | |
db *pgxpool.Pool, | |
logger logger.Logger, | |
) domain.EventStore { | |
return &eventStore{ | |
integrationEventsStore: integrationEventsStore, | |
domainEventHandlers: []domain.TransactionalEventHandler{}, | |
queuedIntegrationEventMessages: []integration.EventMessage{}, | |
db: db, | |
logger: logger, | |
} | |
} | |
// NewStoreSession creates a new event store session. | |
func (s *eventStore) NewStoreSession() domain.EventStoreSession { | |
return NewEventStoreSession() | |
} | |
// Commit saves the changes made during the current session to the event store. | |
func (s *eventStore) Commit(ctx context.Context, | |
eventstoreSession domain.EventStoreSession, | |
) error { | |
var err error | |
commitLogErrMsg := "commit failed" | |
if eventstoreSession.IsEmpty() { | |
s.logger.Warn(ctx, "empty event store session provided") | |
return domain.ErrEventStoreContextWithoutSession | |
} | |
eventsNames := []string{} | |
for _, em := range eventstoreSession.Get() { | |
eventsNames = append(eventsNames, string(em.GetPayload().ProtoReflect().Descriptor().FullName())) | |
} | |
logFields := []zapcore.Field{ | |
zap.Strings("events", eventsNames), | |
zap.String("transaction_session", eventstoreSession.ID()), | |
} | |
tx, err := s.db.Begin(ctx) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "transaction error"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// Rollback is safe to call even if the tx is already closed; | |
// if the tx commits successfully, this is a no-op | |
defer tx.Rollback(ctx) | |
// Run through events | |
// each event MUST be handled by `transactional handler` | |
for _, em := range eventstoreSession.Get() { | |
messageName := em.GetPayload().ProtoReflect().Descriptor().FullName() | |
// Handle event by transactional event handlers | |
hasTransHandler := false | |
for _, h := range s.domainEventHandlers { | |
if h.Handles(messageName) { | |
hasTransHandler = true | |
err = h.Handle(ctx, tx, messageName, em) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "domain event handler failed"), | |
zap.String("event_type", string(messageName)), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
} | |
} | |
// err if no `transactional handler` was provided for the event | |
if !hasTransHandler { | |
reason := "domaineventstore: no transactional handler" | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("error", reason), | |
zap.String("event_type", string(messageName)), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.New(ctx, reason) | |
} | |
// adapt domain event to an integration event if an adapter for it is registered | |
if len(s.integrationEventsAdapters) > 0 { | |
for _, adapter := range s.integrationEventsAdapters { | |
if adapter.CanAdapt(em) { | |
integrationEvent, err := adapter.Adapt(ctx, em) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "integration event adapter failed"), | |
zap.String("event_type", string(messageName)), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// Set "queued for publishing" status | |
integrationEvent.WithStatus(integration.MessageStatus_PUBLISH_QUEUED) | |
// append message to the queue | |
s.queuedIntegrationEventMessages = append(s.queuedIntegrationEventMessages, integrationEvent) | |
} | |
} | |
} | |
} | |
// persist domain events | |
if err := s.persistToDB(ctx, tx, eventstoreSession.Get()...); err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "failed to persist domain event messages"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// persist integration events | |
if len(s.queuedIntegrationEventMessages) > 0 { | |
if err := s.integrationEventsStore.Save(ctx, tx, s.queuedIntegrationEventMessages...); err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "failed to persist integration event messages"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
} | |
// Commit tx | |
err = tx.Commit(ctx) | |
if err != nil { | |
logFields = append(logFields, []zapcore.Field{ | |
zap.String("reason", "pg transaction failed"), | |
zap.String("error", s.logger.MakeStringJsonSafe(err.Error())), | |
}...) | |
s.logger.Error(ctx, commitLogErrMsg, logFields...) | |
return apperrors.Wrap(ctx, err) | |
} | |
// once the transaction is commited succesfully, we can publish the integration events (if any) | |
if len(s.queuedIntegrationEventMessages) > 0 { | |
s.integrationEventsStore.Publish(ctx, s.queuedIntegrationEventMessages...) | |
// IMPORTANT! must clear queued integration events from the slice | |
s.queuedIntegrationEventMessages = []integration.EventMessage{} | |
} | |
s.logger.Debug(ctx, "commit completed", logFields...) | |
return nil | |
} | |
// Get retrieves a single event by its ID. | |
func (s *eventStore) Get(ctx context.Context, id string) (domain.EventMessage, error) { | |
return nil, nil | |
} | |
// FindAll retrieves all events. | |
func (s *eventStore) FindAll(ctx context.Context) ([]domain.EventMessage, error) { | |
query := ` | |
SELECT | |
id, | |
event_id, | |
stream_id, | |
stream_name, | |
stream_seq_no, | |
event_type, | |
payload, | |
binary_payload, | |
metadata, | |
event_time | |
FROM eventstore.domain_events | |
ORDER BY | |
id ASC | |
LIMIT 1 | |
` | |
rows, err := s.db.Query( | |
context.Background(), | |
query, | |
) | |
if err != nil { | |
if errors.Is(err, pgx.ErrNoRows) { | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
defer rows.Close() | |
events := []domain.EventMessage{} | |
// Iterate through the result set | |
for rows.Next() { | |
eventMsg := eventMsgDB{} | |
err = rows.Scan( | |
&eventMsg.id, | |
&eventMsg.eventID, | |
&eventMsg.streamID, | |
&eventMsg.streamName, | |
&eventMsg.streamSeqNo, | |
&eventMsg.eventType, | |
&eventMsg.payload, | |
&eventMsg.binaryPayload, | |
&eventMsg.metadata, | |
&eventMsg.eventTime, | |
) | |
if err != nil { | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload) | |
if err != nil { | |
s.logger.Error(ctx, "FindAll failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg) | |
// set time | |
em.WithEventTime(eventMsg.eventTime) | |
// set metadata | |
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata) | |
if err == nil { | |
em.WithMetadata(md) | |
} | |
events = append(events, em) | |
} | |
return events, nil | |
} | |
// GetStream retrieves events for a specific stream. | |
func (s *eventStore) GetStream(ctx context.Context, streamID string, streamName string) ([]domain.EventMessage, error) { | |
query := ` | |
SELECT | |
id, | |
event_id, | |
stream_id, | |
stream_name, | |
stream_seq_no, | |
event_type, | |
binary_payload, | |
metadata, | |
event_time | |
FROM eventstore.domain_events | |
WHERE | |
stream_id=$1 AND stream_name=$2 | |
ORDER BY | |
id ASC | |
` | |
rows, err := s.db.Query( | |
context.Background(), | |
query, | |
streamID, | |
streamName, | |
) | |
// TODO check error type | |
if err != nil { | |
switch { | |
case errors.Is(err, pgx.ErrNoRows): | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
defer rows.Close() | |
events := []domain.EventMessage{} | |
// Iterate through the result set | |
for rows.Next() { | |
eventMsg := eventMsgDB{} | |
err = rows.Scan( | |
&eventMsg.id, | |
&eventMsg.eventID, | |
&eventMsg.streamID, | |
&eventMsg.streamName, | |
&eventMsg.streamSeqNo, | |
&eventMsg.eventType, | |
// &eventMsg.payload, | |
&eventMsg.binaryPayload, | |
&eventMsg.metadata, | |
&eventMsg.eventTime, | |
) | |
// TODO check error | |
if err != nil { | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
pmsg, err := protobuf.ProtoMessageFromBytes(eventMsg.eventType, eventMsg.binaryPayload) | |
if err != nil { | |
s.logger.Error(ctx, "GetStream failed", zap.String("error", s.logger.MakeStringJsonSafe(err.Error()))) | |
return nil, apperrors.Wrap(ctx, err) | |
} | |
em := domain.NewEventMessageFromProtoMessage(ctx, eventMsg.streamID, eventMsg.streamName, int(eventMsg.streamSeqNo), pmsg) | |
// set time | |
em.WithEventTime(eventMsg.eventTime) | |
// set metadata | |
md, err := metadata.EventMessageMetadataFromPayload(eventMsg.metadata) | |
if err == nil { | |
em.WithMetadata(md) | |
} | |
events = append(events, em) | |
} | |
if len(events) == 0 { | |
return nil, domain.ErrEventStoreStreamNotFound | |
} | |
return events, nil | |
} | |
// GetStreamEventsByType retrieves events of a specific type for a stream. | |
func (s *eventStore) GetStreamEventsByType(ctx context.Context, streamID string, streamName string, eventType string) ([]domain.EventMessage, error) { | |
return nil, nil | |
} | |
// SubscribeTransactionalEventHandlers subscribes transactional event handlers. | |
func (s *eventStore) SubscribeTransactionalEventHandlers(handlers ...domain.TransactionalEventHandler) error { | |
for _, handler := range handlers { | |
// check if it was already susbcribed | |
alreadyExists := false | |
// | |
for _, v := range s.domainEventHandlers { | |
if v.Name() == handler.Name() { | |
s.logger.Warn(context.TODO(), | |
fmt.Sprintf("transactional handler '%s' already subscribed", v.Name()), | |
) | |
} | |
} | |
if !alreadyExists { | |
s.domainEventHandlers = append(s.domainEventHandlers, handler) | |
} | |
} | |
return nil | |
} | |
// SubscribeIntegrationEventMessageAdapters subscribes integration event message adapters. | |
func (s *eventStore) SubscribeIntegrationEventMessageAdapters(adapters ...domain.IntegrationEventMessageAdapter) error { | |
s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapters...) | |
// check if duplicates, then ... | |
// for _, adapter := range adapters { | |
// s.integrationEventsAdapters = append(s.integrationEventsAdapters, adapter) | |
// } | |
return nil | |
} | |
// HasAccountId is an interface used to check if the event payload message has an account ID. | |
type HasAccountId interface { | |
GetAccountId() string | |
} | |
// persistToDB persists events to the PostgreSQL database. | |
func (s *eventStore) persistToDB(ctx context.Context, tx pgx.Tx, eventMessages ...domain.EventMessage) error { | |
rows := [][]interface{}{} | |
if len(eventMessages) == 0 { | |
s.logger.Warn(ctx, "persit called with no events to persist", | |
s.logger.WithStackTrace("stacktrace"), | |
) | |
return nil | |
} | |
// for each event | |
for _, event := range eventMessages { | |
binPayload, err := proto.Marshal(event.GetPayload()) | |
if err != nil { | |
return apperrors.Wrap(ctx, err) | |
} | |
metadata, err := json.Marshal(event.GetMetadata()) | |
if err != nil { | |
return apperrors.Wrap(ctx, err) | |
} | |
// t, ok := (event.GetPayload()).(WithAccountId) | |
// if ok { | |
// util.DebugJSON(t.GetAccountId()) | |
// } | |
rows = append(rows, []interface{}{ | |
event.GetId(), | |
event.GetStreamId(), | |
event.GetStreamName(), | |
event.GetStreamSeqenceNo(), | |
event.GetEventType(), | |
event.GetPayload(), | |
binPayload, | |
metadata, | |
event.GetEventTime().UTC(), | |
}) | |
} | |
tableIdentifier := pgx.Identifier{ | |
"eventstore", "domain_events", | |
} | |
// copyCount | |
_, err := tx.CopyFrom( | |
ctx, | |
tableIdentifier, | |
[]string{ | |
"event_id", | |
"stream_id", | |
"stream_name", | |
"stream_seq_no", | |
"event_type", | |
"payload", | |
"binary_payload", | |
"metadata", | |
"event_time", | |
}, | |
pgx.CopyFromRows(rows), | |
) | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment