Created
May 28, 2023 16:31
-
-
Save APiercey/2960739a7ec3bd2bfdfdc4cc557d42d6 to your computer and use it in GitHub Desktop.
Repository Pattern for Event Sourcing with DynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DynamoDBRepo | |
class AggregateClassUndefined < StandardError | |
def message | |
"Aggregate class is not defined" | |
end | |
end | |
class EventBuilderModuleUndefined < StandardError | |
def message | |
"Event builder module is not defined" | |
end | |
end | |
@aggregate_class = nil | |
@event_builder_module = nil | |
def initialize(dynamodb_client) | |
@dynamodb_client = dynamodb_client | |
raise AggregateClassUndefined if aggregate_class.nil? | |
raise EventBuilderModuleUndefined if event_builder_module.nil? | |
end | |
def self.aggregate(aggregate_class) | |
@aggregate_class = aggregate_class | |
end | |
def self.aggregate_class | |
@aggregate_class | |
end | |
def self.event_builder(event_builder) | |
@event_builder_module = event_builder | |
end | |
def self.event_builder_module | |
@event_builder_module | |
end | |
def fetch(uuid) | |
agg = aggregate_class.new | |
events = @dynamodb_client.fetch_aggregate_events(uuid) | |
return nil if events.empty? | |
agg.version = events.last.fetch("Version").to_i | |
events | |
.map { |event| build_event(event) } | |
.reject(&:nil?) | |
.each { |event| agg.apply(event) } | |
if agg.uuid.nil? | |
nil | |
else | |
agg | |
end | |
end | |
def store(agg) | |
new_version = agg.version | |
new_events = agg.changes.map do |event| | |
new_version = new_version + 1 | |
{ | |
EventUuid: SecureRandom.uuid, | |
AggregateUuid: agg.uuid, | |
Name: event.class::NAME, | |
Data: event.to_h, | |
Version: new_version | |
} | |
end | |
@dynamodb_client.insert_aggregate_events!(new_events) | |
agg.clear_changes | |
agg.version = new_version | |
agg | |
end | |
private | |
def build_event(raw_event) | |
event_builder_module.build(raw_event.fetch('Name'), raw_event.fetch('Data')) | |
end | |
def aggregate_class | |
self.class.aggregate_class | |
end | |
def event_builder_module | |
self.class.event_builder_module | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EsDynamoTableClient | |
def initialize(dynamodb_client, table_name) | |
@dynamodb_client = dynamodb_client | |
@table_name = table_name | |
end | |
def fetch_aggregate_events(aggregate_uuid) | |
query_options = { | |
table_name: @table_name, | |
key_condition_expression: "AggregateUuid = :aggregate_uuid", | |
expression_attribute_values: { | |
":aggregate_uuid" => aggregate_uuid, | |
}, | |
consistent_read: true | |
} | |
items = [] | |
result = @dynamodb_client.query(query_options) | |
loop do | |
items << result.items | |
break unless (last_evaluated_key = result.last_evaluated_key) | |
result = @dynamodb_client.query(query_options.merge(exclusive_start_key: last_evaluated_key)) | |
end | |
items.flatten | |
end | |
def insert_aggregate_events!(events) | |
put_operations = events.map do |event| | |
{ | |
put: { | |
item: event, | |
table_name: @table_name, | |
condition_expression: "attribute_not_exists(#v)", | |
expression_attribute_names: { | |
"#v" => "Version" | |
} | |
} | |
} | |
end | |
@dynamodb_client.transact_write_items({transact_items: put_operations}) | |
nil | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Events | |
module Builder | |
def self.build(name, data) | |
case name | |
when "CartOpened" | |
Events::CartOpened.new(data.fetch("shopping_cart_uuid")) | |
when "ItemAdded" | |
Events::ItemAdded.new(data.fetch("shopping_cart_uuid"), data.fetch("item_name")) | |
when "CartClosed" | |
Events::CartClosed.new(data.fetch("shopping_cart_uuid")) | |
else | |
nil | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ShoppingCartRepo < DynamoDBRepo | |
aggregate ShoppingCart | |
event_builder Events::Builder | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment