Skip to content

Instantly share code, notes, and snippets.

@jkonecki
Created December 26, 2016 21:00
Show Gist options
  • Save jkonecki/c93960fee8ecaf374f451b05bb7a34d0 to your computer and use it in GitHub Desktop.
Save jkonecki/c93960fee8ecaf374f451b05bb7a34d0 to your computer and use it in GitHub Desktop.
JournaledGrain + SS
/*
https://github.com/dotnet/orleans/blob/master/src/OrleansEventSourcing/JournaledGrainState.cs
The example from Orleans repo uses existing providers to store the event stream as List<Event>.
This is not needed when using StreamStone - instead of using existing providers and calling WriteStateAsync() you can read / write yourself
*/
// 1. Define class that will be used by StreamStone to represent event row
public class EventEntity : TableEntity
{
public Guid Id { get; set; }
public string Type { get; set; }
public string Data { get; set; }
public string Metadata { get; set; }
public int Version { get; set; } // Azure doesn't support uint :-(
}
// 2. Grain using SS
public class EventSourcedGrain<TState> : JournaledGrain<TState>
where TState : JournaledGrainsState
{
public override async Task OnActivate()
{
try
{
StreamSlice<EventEntity> slice;
var nextSliceStart = 1;
do
{
slice = await Stream.ReadAsync<EventEntity>(this.Partition, nextSliceStart, sliceSize: 1);
foreach (var eventData in slice.Events)
{
dynamic @event = JsonConvert.DeserializeObject(eventData.Data, Type.GetType(eventData.Type));
this.State.StateTransition(@event); // Make this method public
this.Version++;
}
nextSliceStart = slice.HasEvents ? slice.Events.Last().Version + 1 : -1;
}
while (!slice.IsEndOfStream);
}
catch (StreamDoesNotExistException)
{
}
await base.OnActivate();
}
protected ovrride async Task WriteStateAsync()
{
var events = this.State.Events;
await Stream.WriteAsync(this.Partition, this.Version, events); // Serialiaze events from State and write them using SS
this.State.Clear(); // Clear internal List<Event>
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment