Created
October 18, 2016 20:06
-
-
Save ruslander/0414202dd230690cd438b9463ef5bd7b to your computer and use it in GitHub Desktop.
StreamStone
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Configuration; | |
using System.Linq; | |
using Microsoft.WindowsAzure.Storage; | |
using Newtonsoft.Json; | |
using Streamstone; | |
namespace InServiceCqrs | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var client = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]).CreateCloudTableClient(); | |
var table = client.GetTableReference("Streams"); | |
table.CreateIfNotExists(); | |
var partition = new Partition(table, "default"); | |
var stream = OpenOrCreate(partition); | |
AppendToStream(stream); | |
ReadAllFromStream(partition); | |
Console.ReadKey(); | |
} | |
private static void ReadAllFromStream(Partition partition) | |
{ | |
Console.WriteLine("Reading all events in a stream"); | |
Console.WriteLine("If slice size is > than WATS limit, continuation token will be managed automatically"); | |
StreamSlice<EventEnvelope> slice; | |
var nextSliceStart = 1; | |
do | |
{ | |
slice = Stream.Read<EventEnvelope>(partition, nextSliceStart, sliceSize: 1); | |
foreach (var @event in slice.Events) | |
Console.WriteLine("{0}:{1} {2}-{3}", @event.Id, @event.Version, @event.Type, @event.Data); | |
nextSliceStart = slice.HasEvents | |
? slice.Events.Last().Version + 1 | |
: -1; | |
} while (!slice.IsEndOfStream); | |
} | |
private static void AppendToStream(Stream stream) | |
{ | |
Console.WriteLine("Writing to new stream in partition '{0}'", stream.Partition); | |
var id = Guid.NewGuid().ToString(); | |
var result = Stream.Write(stream, | |
ToEventData(new InventoryItemCreated(id, "iPhone6")), | |
ToEventData(new InventoryItemCheckedIn(id, 100)) | |
); | |
Console.WriteLine("Succesfully written to new stream.\r\nEtag: {0}, Version: {1}", result.Stream.ETag, | |
result.Stream.Version); | |
} | |
private static Stream OpenOrCreate(Partition partition) | |
{ | |
Stream stream; | |
try | |
{ | |
stream = Stream.Provision(partition); | |
Console.WriteLine("Provisioned new empty stream in partition '{0}'", stream.Partition); | |
} | |
catch (Exception) | |
{ | |
stream = Stream.Open(partition); | |
Console.WriteLine("Openned stream in partition '{0}'", stream.Partition); | |
} | |
Console.WriteLine("Etag: {0}", stream.ETag); | |
Console.WriteLine("Version: {0}", stream.Version); | |
return stream; | |
} | |
static EventData ToEventData(object e) | |
{ | |
var id = Guid.NewGuid(); | |
return new EventData(EventId.From(id), EventProperties.From(new EventEnvelope | |
{ | |
Id = id, | |
Type = e.GetType().Name, | |
Data = JsonConvert.SerializeObject(e) | |
})); | |
} | |
} | |
class EventEnvelope | |
{ | |
public Guid Id { get; set; } | |
public string Type { get; set; } | |
public string Data { get; set; } | |
public int Version { get; set; } | |
} | |
public class InventoryItemCreated | |
{ | |
public readonly string Id; | |
public readonly string Name; | |
public InventoryItemCreated(string id, string name) | |
{ | |
Id = id; | |
Name = name; | |
} | |
} | |
public class InventoryItemCheckedIn | |
{ | |
public readonly string Id; | |
public readonly int Count; | |
public InventoryItemCheckedIn(string id, int count) | |
{ | |
Id = id; | |
Count = count; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment