Skip to content

Instantly share code, notes, and snippets.

@brunoliveira8
Created June 22, 2020 11:00
Show Gist options
  • Save brunoliveira8/2a25dd6a04f8a5002aaab51d7374ce47 to your computer and use it in GitHub Desktop.
Save brunoliveira8/2a25dd6a04f8a5002aaab51d7374ce47 to your computer and use it in GitHub Desktop.
Event Sourcing Concepts
import json
class EventStore:
base_location = 'events'
storage_format = 'jsonl'
@classmethod
def _get_event_storage_path(cls, key):
event_storage_path = f'{cls.base_location}/{key}.{cls.storage_format}'
return event_storage_path
@classmethod
def insert(cls, key, event):
event_storage_path = cls._get_event_storage_path(key)
with open(event_storage_path, 'a') as storage:
json.dump(event, storage)
storage.write('\n')
@classmethod
def get(cls, key):
event_storage_path = cls._get_event_storage_path(key)
with open(event_storage_path, 'r') as storage:
events = [json.loads(event) for event in storage]
return events
class ProjectAccountBalance:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def get_balance(self, user_id: str):
events = self.event_store.get(key=user_id)
balance = sum(
event['amount'] for event in events
)
return balance
def main():
print('Starting...')
key = 'user_1'
event = {
'operation': 'deposit',
'amount': 50.0
}
account = ProjectAccountBalance(
event_store=EventStore
)
EventStore.insert(key, event)
print(account.get_balance(key))
print('Closing...')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment