Created
June 9, 2020 15:23
-
-
Save ayende/6351d6989f111f0a4f84b301611525ce to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Net.Sockets; | |
using System.Threading.Tasks; | |
using Nito.AsyncEx; | |
using Raven.Client.Documents; | |
using System.Reactive.Linq; | |
using System.Threading; | |
using Raven.Client.Documents.Indexes; | |
using Raven.Client.Documents.Linq.Indexing; | |
using Raven.Client.Documents.Operations.CompareExchange; | |
using Raven.Client.Documents.Session; | |
using Raven.Client.Documents.Session.Loaders; | |
using Raven.Client.Documents.Subscriptions; | |
using Raven.Client.Exceptions; | |
using Raven.Client.Exceptions.Documents.Subscriptions; | |
using Raven.Client.Json; | |
namespace Crypto | |
{ | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
using var store = new DocumentStore | |
{ | |
Urls = new[] { "http://live-test.ravendb.net" }, | |
Database = "Demo", | |
}; | |
var defaultCollectionName = store.Conventions.FindCollectionName; | |
store.Conventions.FindCollectionName = type => | |
typeof(Command).IsAssignableFrom(type) ? | |
"Commands" : defaultCollectionName(type); | |
store.Initialize(); | |
await new ReadyCommands().ExecuteAsync(store); | |
await CreateCommandsSubscription(store); | |
// or | |
await CreateSubscription<SendNewEmployeeMailCommand>(store); | |
await CreateSubscription<SendBirthdayCardForEmployeeCommand>(store); | |
//var _ = SendNewEmployeeEmail(store).ContinueWith(PrintStatus); | |
var tasks = new[] | |
{ | |
CompetingProcessors_ClusterVersion(store, "1"), | |
CompetingProcessors_ClusterVersion(store, "2"), | |
CompetingProcessors_ClusterVersion(store, "3") | |
}; | |
await RegisterNewEmployee(store, "Ayende", new DateTime(1980,1,2)); | |
await RegisterNewEmployee(store, "Oren", new DateTime(1980, 1, 2)); | |
Console.WriteLine("Done..."); | |
Console.ReadLine(); | |
} | |
private static void PrintStatus(Task task) | |
{ | |
switch (task.Status) | |
{ | |
case TaskStatus.RanToCompletion: | |
Console.WriteLine("Done successfully"); | |
break; | |
case TaskStatus.Canceled: | |
Console.WriteLine("Task cancelled"); | |
break; | |
case TaskStatus.Faulted: | |
Console.WriteLine("Error!"); | |
foreach (var exception in task.Exception.InnerExceptions) | |
{ | |
var e = exception; | |
while (e != null) | |
{ | |
Console.WriteLine(e.Message); | |
e = e.InnerException; | |
} | |
} | |
break; | |
default: | |
Console.WriteLine("No idea: " + task.Status); | |
break; | |
} | |
} | |
public static async Task CompetingProcessors(IDocumentStore store, string id) | |
{ | |
var hasMsgs = new AsyncManualResetEvent(true); | |
store.Changes() | |
.ForIndex(new ReadyCommands().IndexName) | |
.Subscribe(x => hasMsgs.Set()); | |
do | |
{ | |
using (var session = store.OpenAsyncSession()) | |
{ | |
var result = await session.Query<Command, ReadyCommands>() | |
.OrderBy("Date") | |
.Include(include => include.IncludeDocuments<Locker>(x => x.Id)) | |
.FirstOrDefaultAsync(); | |
if (result == null) | |
{ | |
await hasMsgs.WaitAsync(/*timeout*/); | |
continue; | |
} | |
var locker = await session.LoadAsync<Locker>("lockers/" + result.Id); | |
if (locker != null) | |
{ | |
// index didn't catch up? we'll retry | |
continue; | |
} | |
locker = new Locker | |
{ | |
ClientId = id | |
}; | |
await session.StoreAsync(locker, "lockers/" + result.Id); | |
session.Advanced.GetMetadataFor(locker)["@expires"] = DateTime.UtcNow.AddMinutes(2); | |
session.Advanced.UseOptimisticConcurrency = true; | |
try | |
{ | |
await session.SaveChangesAsync(); | |
} | |
catch (ConcurrencyException) | |
{ | |
// expected, someone else got the lock, will retry | |
continue; | |
} | |
await HandleCommand(session, result, id); | |
session.Delete(result.Id); | |
session.Delete(locker); | |
try | |
{ | |
await session.SaveChangesAsync(); | |
} | |
catch (ConcurrencyException) | |
{ | |
// we timed out on the lock and it expired, probably | |
continue; | |
} | |
} | |
} while (true); | |
} | |
public static async Task CompetingProcessors_ClusterVersion(IDocumentStore store, string id) | |
{ | |
var hasMsgs = new AsyncManualResetEvent(true); | |
store.Changes() | |
.ForDocumentsInCollection<Command>() | |
.Subscribe(x => hasMsgs.Set()); | |
do | |
{ | |
using (var session = store.OpenAsyncSession(new SessionOptions | |
{ | |
TransactionMode = TransactionMode.ClusterWide | |
})) | |
{ | |
var result = await session.Query<Command, ReadyCommands>() | |
.OrderBy("random()") | |
.Include(include => include.IncludeCompareExchangeValue(x => x.Id)) | |
.FirstOrDefaultAsync(); | |
if (result == null) | |
{ | |
await hasMsgs.WaitAsync(); | |
continue; | |
} | |
var locker = await session.Advanced.ClusterTransaction.GetCompareExchangeValueAsync<Locker>(result.Id); | |
if (locker != null) | |
{ | |
// index didn't catch up? we'll retry | |
continue; | |
} | |
locker = session.Advanced.ClusterTransaction.CreateCompareExchangeValue(result.Id, new Locker | |
{ | |
ClientId = id | |
}); | |
locker.Metadata["@expires"] = DateTime.UtcNow.AddMinutes(2); | |
try | |
{ | |
await session.SaveChangesAsync(); | |
} | |
catch (ConcurrencyException) | |
{ | |
// expected, someone else got the lock, will retry | |
continue; | |
} | |
await HandleCommand(session, result, id); | |
session.Delete(result.Id); | |
session.Advanced.ClusterTransaction.DeleteCompareExchangeValue(locker); | |
try | |
{ | |
await session.SaveChangesAsync(); | |
} | |
catch (ConcurrencyException) | |
{ | |
// we timed out on the lock and it expired, probably | |
continue; | |
} | |
} | |
} while (true); | |
} | |
private static async Task HandleCommand(IAsyncDocumentSession session, Command cmd, string id) | |
{ | |
Console.WriteLine($"Processor #{id} Handling command: {cmd.Id}"); | |
} | |
public class Locker | |
{ | |
public string ClientId; | |
} | |
public class ReadyCommands : AbstractIndexCreationTask<Command> | |
{ | |
public ReadyCommands() | |
{ | |
Map = cmds => | |
from cmd in cmds | |
let locker = LoadDocument<Locker>("lockers/" + cmd.Id) | |
where locker == null | |
select new | |
{ | |
Date = MetadataFor(cmd)["@last-modified"] | |
}; | |
} | |
} | |
public static Task SendNewEmployeeEmail(IDocumentStore store) | |
{ | |
var worker = store.Subscriptions.GetSubscriptionWorker<SendNewEmployeeMailCommand>( | |
new SubscriptionWorkerOptions("SendNewEmployeeMailCommand_Subscription") | |
{ | |
Strategy = SubscriptionOpeningStrategy.WaitForFree | |
}); | |
return worker.Run(async batch => | |
{ | |
using var session = batch.OpenAsyncSession(); | |
Console.WriteLine($"Starting batch with {batch.NumberOfItemsInBatch} items"); | |
foreach (var item in batch.Items) | |
{ | |
SendNewEmployeeMailCommand cmd = item.Result; | |
var emp = await session.LoadAsync<Employee>(cmd.Employee); | |
try | |
{ | |
SendEmail(emp); | |
session.Delete(cmd); | |
break; | |
} | |
catch (Exception e) | |
{ | |
cmd.Error = e.ToString(); | |
} | |
} | |
await session.SaveChangesAsync(); | |
}); | |
} | |
private static void SendEmail(Employee emp) | |
{ | |
if (emp.Name.Length > 4) | |
throw new ArgumentException("Max emp name is 4"); | |
Console.WriteLine($"{emp.Name}, welcome! ({emp.Id})"); | |
} | |
public static async Task CreateCommandsSubscription(IDocumentStore store) | |
{ | |
var taskName = "Commands_Subscription"; | |
try | |
{ | |
await store.Subscriptions.GetSubscriptionStateAsync(taskName); | |
return; | |
} | |
catch (SubscriptionDoesNotExistException) | |
{ | |
// expected | |
} | |
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<Command> | |
{ | |
Name = taskName, | |
Filter = c => c.Error == null | |
}); | |
} | |
public static async Task CreateSubscription<T>(IDocumentStore store) | |
where T : Command | |
{ | |
var name = typeof(T).Name; | |
var taskName = $"{name}_Subscription"; | |
try | |
{ | |
await store.Subscriptions.GetSubscriptionStateAsync(taskName); | |
return; | |
} | |
catch (SubscriptionDoesNotExistException) | |
{ | |
// expected | |
} | |
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions | |
{ | |
Name = taskName, | |
Query = $@" | |
declare function filter(c){{ | |
return c.Type == '{name}' && | |
c.Error == null && | |
!c['@metadata'].hasOwnProperty('@refresh'); | |
}} | |
from Commands as c where filter(c) | |
" | |
}); | |
} | |
public static async Task RegisterNewEmployee(IDocumentStore store, string name, DateTime birthday) | |
{ | |
var nextBirthday = new DateTime(DateTime.Today.Year, birthday.Month, birthday.Day); | |
if (nextBirthday < DateTime.Today) | |
nextBirthday = nextBirthday.AddYears(1); | |
using var session = store.OpenAsyncSession(); | |
var newEmployee = new Employee(name, birthday); | |
await session.StoreAsync(newEmployee); | |
await session.StoreAsync(new SendNewEmployeeMailCommand(newEmployee.Id)); | |
var birthdayCmd = new SendBirthdayCardForEmployeeCommand | |
{ | |
Employee = newEmployee.Id, | |
}; | |
await session.StoreAsync(birthdayCmd); | |
session.Advanced.GetMetadataFor(birthdayCmd)["@refresh"] = nextBirthday; | |
await session.SaveChangesAsync(); // single transaction | |
} | |
} | |
public class Employee | |
{ | |
public DateTime Birthday; | |
public string Name; | |
public string Id; | |
public Employee() | |
{ | |
} | |
public Employee(string name, DateTime birthday) | |
{ | |
Birthday = birthday; | |
Name = name; | |
} | |
} | |
public class Command | |
{ | |
public string Id; | |
public string Error; | |
public string Type => GetType().Name; | |
} | |
public class BatchCommand : Command | |
{ | |
public Command[] Commands; | |
} | |
public class SendBirthdayCardForEmployeeCommand : Command | |
{ | |
public string Employee; | |
} | |
public class SendNewEmployeeMailCommand : Command | |
{ | |
public string Employee; | |
public SendNewEmployeeMailCommand() | |
{ | |
} | |
public SendNewEmployeeMailCommand(string employee) | |
{ | |
Employee = employee; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment