General purpose thread-safe event class.
Features:
- Thread-safety.
- Supports capturing the SyncronizationContext (Which makes it useable in the Unity3d engine).
- Provides a
Task<T>
api for waiting for events. - Provides a callback (Subscribe / Unsubscribe) api for receiving events.
- Reasonable performance.
- Ability to pass a custom
subscriptionToken
makes it easier to unsubscribe lambda's. - Supports storing data that was invoked when no-one was subscribed (or waiting) yet.
Allocations:
- Invoke is allocation free (after it has warmed up some caches).
- Waiting using the
Task<T>
api allocates a handler on the heap. - New subscriptions allocates a handler on the heap.
- Unsubscriptions are allocation free (Depending on 'Equals' implementation on your provided subscriptionToken).
Unity3d note: If you call 'Subscribe' from the unity-thread (and leave callOnCapturedContext
at true
) then your
callback will only be invoked on the unity-thread making it very safe to use.
Example:
class Program
{
static async Task Main()
{
var worker = new Worker();
// Async style:
var result = await worker.WorkReady.WaitAsync();
Console.WriteLine($"Got async: '{result}'");
// Callback style:
worker.WorkReady.Subscribe(WorkReady);
Console.ReadKey();
worker.WorkReady.Unsubscribe(WorkReady);
}
static void WorkReady(int output) => Console.WriteLine($"Got from subscription: '{output}'");
class Worker : IExceptionHandler
{
readonly SynchronizedEvent<int> outputEvent;
readonly Task backgroundWork;
public Worker()
{
this.outputEvent = new SynchronizedEvent<int>(
exceptionHandler: this,
storeUnobservedData: true);
this.backgroundWork = Task.Run(BackgroundWorkAsync);
}
public IReadOnlySynchronizedEvent<int> WorkReady => this.outputEvent;
void IExceptionHandler.Handle(Exception e) => Console.Error.Write(e);
async Task BackgroundWorkAsync()
{
while (true)
{
await Task.Delay(1000);
this.outputEvent.Invoke(42);
}
}
}
}
SynchronizedEvent<T>
:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Interface for exception handlers.
/// </summary>
public interface IExceptionHandler
{
/// <summary>
/// Handle given exception.
/// </summary>
/// <remarks>Implementation should not rethrow in this call-stack.</remarks>
/// <param name="exception">Exception to handle.</param>
void Handle(Exception exception);
}
/// <summary>
/// Thread-safe event that supports task and callback style listening.
/// </summary>
/// <typeparam name="T">Type of the event data.</typeparam>
public interface IReadOnlySynchronizedEvent<T>
{
/// <summary>
/// Wait for an event to be fired.
/// </summary>
/// <param name="cancelToken">Token to be able to cancel the task.</param>
/// <returns>Task that completes when an event is received or is cancelled.</returns>
Task<T> WaitAsync(CancellationToken cancelToken = default);
/// <summary>
/// Subscribe to events.
/// </summary>
/// <param name="action">Action to invoke when an event is fired.</param>
/// <param name="subscriptionToken">
/// Token to use for unsubscribing, if none is provided then the 'action' will be used.
/// </param>
/// <param name="callOnCapturedContext">
/// Should the action only be called on the SynchronizationContext that was active when
/// subscribing.
/// </param>
void Subscribe(Action<T> action, object subscriptionToken = null, bool callOnCapturedContext = true);
/// <summary>
/// Subscribe to events.
/// </summary>
/// <remarks>
/// State parameter can be used to avoid having to use a closure to capture state.
/// </remarks>
/// <param name="action">Action to invoke when an event is fired.</param>
/// <param name="state">State that is passed to the action.</param>
/// <param name="subscriptionToken">
/// Token to use for unsubscribing, if none is provided then the 'action' will be used.
/// </param>
/// <param name="callOnCapturedContext">
/// Should the action only be called on the SynchronizationContext that was active when
/// subscribing.
/// </param>
void Subscribe(
Action<T, object> action,
object state,
object subscriptionToken = null,
bool callOnCapturedContext = true);
/// <summary>
/// Unsubscribe from events.
/// </summary>
/// <param name="action">Action that was used as the subscriptionToken.</param>
/// <returns>True if successfully unsubscribed otherwise False.</returns>
bool Unsubscribe(Action<T> action);
/// <summary>
/// Unsubscribe from events.
/// </summary>
/// <param name="subscriptionToken">Token that was used for subscribing.</param>
/// <returns>True if successfully unsubscribed otherwise False.</returns>
bool Unsubscribe(object subscriptionToken);
}
/// <summary>
/// Thread-safe event that supports task and callback style listening.
///
/// Thread safety: Api should be threadsafe.
/// </summary>
/// <typeparam name="T">Type of the event data.</typeparam>
public sealed class SynchronizedEvent<T> : IReadOnlySynchronizedEvent<T>, IDisposable
{
[ThreadStatic] private static List<AwaiterHandle> awaiterInvokeList;
[ThreadStatic] private static List<SubscribeHandle> subInvokeList;
private readonly IExceptionHandler exceptionHandler;
private readonly bool storeUnobservedData;
private readonly bool allowSynchronousContinuations;
private readonly object awaitersLock = new object();
private readonly List<AwaiterHandle> awaiters = new List<AwaiterHandle>();
private readonly object subscribersLock = new object();
private readonly List<SubscribeHandle> subscribers = new List<SubscribeHandle>();
private readonly Lazy<ConcurrentQueue<T>> unobservedData = new Lazy<ConcurrentQueue<T>>();
private volatile int disposeCount;
/// <summary>
/// Initializes a new instance of the <see cref="SynchronizedEvent{T}"/> class.
/// </summary>
/// <param name="exceptionHandler">Handler for dealing with exceptions during callback invoke.</param>
/// <param name="storeUnobservedData">Should data be stored when there is no-one listening.</param>
/// <param name="allowSynchronousContinuations">
/// Are task-continuations allowed to execute synchronously.
/// Use with caution as its easy to shoot yourself in the foot with this and cause deadlocks.
/// For example if a awaiter would block its thread after the await that would also block the
/// 'Invoke' call of this event.
/// Why would you ever want to enable this? Well if you know its safe in your usecase then its
/// faster as it requires less context switching.
/// </param>
public SynchronizedEvent(
IExceptionHandler exceptionHandler,
bool storeUnobservedData = false,
bool allowSynchronousContinuations = false)
{
this.exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
this.storeUnobservedData = storeUnobservedData;
this.allowSynchronousContinuations = allowSynchronousContinuations;
}
/// <summary>
/// Check if the given task was created by <see cref="SynchronizedEvent{T}"/>.
/// </summary>
/// <param name="task">Task to check.</param>
/// <returns>True if task was created by <see cref="SynchronizedEvent{T}"/> otherwise False.</returns>
public static bool IsOwnedBy(Task task)
{
if (task is null)
throw new ArgumentNullException(nameof(task));
return task.AsyncState != null && task.AsyncState is AwaiterHandle;
}
/// <summary>
/// Attempt to cancel a task that is created by <see cref="SynchronizedEvent{T}"/>.
/// </summary>
/// <remarks>
/// Returns false when given a task that was not created from <see cref="SynchronizedEvent{T}"/>.
/// </remarks>
/// <param name="task">Task to attempt to cancel.</param>
/// <returns>True if successfully cancelled, otherwise False.</returns>
public static bool TryCancel(Task task)
{
if (task is null)
throw new ArgumentNullException(nameof(task));
var handle = task.AsyncState as AwaiterHandle;
if (handle == null)
return false;
return handle.TryCancel();
}
/// <inheritdoc/>
public Task<T> WaitAsync(CancellationToken cancelToken = default) =>
this.WaitAsyncInternal(cancelToken);
/// <inheritdoc/>
public void Subscribe(
Action<T> action,
object subscriptionToken = null,
bool callOnCapturedContext = true)
{
if (action is null)
throw new ArgumentNullException(nameof(action));
this.SubscribeInternal(action, state: null, subscriptionToken ?? action, callOnCapturedContext);
}
/// <inheritdoc/>
public void Subscribe(
Action<T, object> action,
object state,
object subscriptionToken = null,
bool callOnCapturedContext = true)
{
if (action is null)
throw new ArgumentNullException(nameof(action));
this.SubscribeInternal(action, state, subscriptionToken ?? action, callOnCapturedContext);
}
/// <inheritdoc/>
public bool Unsubscribe(Action<T> action)
{
if (action is null)
throw new ArgumentNullException(nameof(action));
return this.UnsubscribeInternal(action);
}
/// <inheritdoc/>
public bool Unsubscribe(object subscriptionToken)
{
if (subscriptionToken is null)
throw new ArgumentNullException(nameof(subscriptionToken));
return this.UnsubscribeInternal(subscriptionToken);
}
/// <summary>
/// Invoke the event.
/// </summary>
/// <param name="data">Data of the event.</param>
public void Invoke(T data) => this.InvokeInternal(data);
/// <inheritdoc/>
public void Dispose()
{
// Using 'Interlocked.Increment' to make sure we only dispose once even when called concurrently.
if (Interlocked.Increment(ref this.disposeCount) == 1)
this.DisposeInternal();
}
private Task<T> WaitAsyncInternal(CancellationToken cancelToken = default)
{
// If the event has been disposed or cancellation is already requested then early out.
if (this.disposeCount != 0 || cancelToken.IsCancellationRequested)
return Task.FromCanceled<T>(cancelToken);
// If there is any unobserved data then return that.
if (this.unobservedData.IsValueCreated)
{
if (this.unobservedData.Value.TryDequeue(out var data))
return Task.FromResult(data);
}
// Otherwise create a handle to wait for a invoke.
var handle = new AwaiterHandle(cancelToken, this.allowSynchronousContinuations);
// Register the handle.
lock (this.awaitersLock)
{
this.awaiters.Add(handle);
}
return handle.WaitForInvoke;
}
private void SubscribeInternal(
object action,
object state,
object subscriptionToken,
bool callOnCapturedContext = true)
{
Debug.Assert(action is Action<T> || action is Action<T, object>, "Invalid action type");
Debug.Assert(subscriptionToken != null, "Missing subscription token");
// If the event has been disposed it can never receive messages anymore.
if (this.disposeCount != 0)
throw new ObjectDisposedException(nameof(SynchronizedEvent<T>));
// Create handle for this subscription.
var syncContext = callOnCapturedContext ? SynchronizationContext.Current : null;
var handle = new SubscribeHandle(action, state, subscriptionToken, syncContext, this.exceptionHandler);
lock (this.subscribersLock)
{
this.subscribers.Add(handle);
}
// Handle any unobserved data.
if (this.unobservedData.IsValueCreated)
{
var queue = this.unobservedData.Value;
while (queue.TryDequeue(out var data))
handle.Invoke(data);
}
}
private bool UnsubscribeInternal(object subscriptionToken)
{
// If the event has been disposed then there is no need to unsubscribe anymore.
if (this.disposeCount != 0)
return false;
var removed = false;
// Remove all subscribers with the same 'subscriptionToken'.
lock (this.subscribersLock)
{
for (int i = this.subscribers.Count - 1; i >= 0; i--)
{
if (this.subscribers[i].SubscriptionToken.Equals(subscriptionToken))
{
// Safe to call dispose here while holding the lock because it only sets a bool.
this.subscribers[i].Dispose();
this.subscribers.RemoveAt(i);
removed = true;
}
}
}
return removed;
}
private void InvokeInternal(T data)
{
if (this.disposeCount != 0)
throw new ObjectDisposedException(nameof(SynchronizedEvent<T>));
// Keep track if someone was invoked with this data.
var observed = false;
/* Complete all the awaiters.
Note: We first make a list of things to invoke before invoking them, reason is
otherwise there would be a deadlock if a continuation that executes synchronously
would call 'WaitAsync' on this event again. */
// 'awaiterInvokeList' is thread-static so this is safe to do.
if (awaiterInvokeList == null)
awaiterInvokeList = new List<AwaiterHandle>();
else
awaiterInvokeList.Clear();
// Gather all awaiters to invoke and clear them.
lock (this.awaitersLock)
{
awaiterInvokeList.AddRange(this.awaiters);
this.awaiters.Clear();
}
// Invoke the awaiters.
foreach (var awaiter in awaiterInvokeList)
observed |= awaiter.TryComplete(data);
/* Invoke all the subscribers.
Note: We first make a list of things to invoke before invoking them, reason is
otherwise there would be a deadlock if someone called Subscribe / Unsubscribe from
inside a invoke call. */
// 'subInvokeList' is thread-static so this is safe to do.
if (subInvokeList == null)
subInvokeList = new List<SubscribeHandle>();
else
subInvokeList.Clear();
// Gather all the subscribers to invoke.
lock (this.subscribersLock)
{
subInvokeList.AddRange(this.subscribers);
}
// Invoke the subscribers.
foreach (var sub in subInvokeList)
sub.Invoke(data);
observed |= subInvokeList.Count != 0;
// Store data that was not observed.
if (!observed && this.storeUnobservedData)
this.unobservedData.Value.Enqueue(data);
}
private void DisposeInternal()
{
// Dispose all the awaiters.
lock (this.awaitersLock)
{
/* It is safe to call 'Dispose' here while holding the lock because we've already
marked the event as disposed so new calls to 'WaitAsync' will be denied so they
cannot cause a deadlock */
foreach (var awaiter in this.awaiters)
awaiter.Dispose();
this.awaiters.Clear();
}
// Dispose all the subscribers.
lock (this.subscribersLock)
{
foreach (var sub in this.subscribers)
sub.Dispose();
this.subscribers.Clear();
}
}
private sealed class AwaiterHandle
{
private readonly TaskCompletionSource<T> completeSource;
private readonly CancellationTokenRegistration cancelReg;
internal AwaiterHandle(CancellationToken cancelToken, bool allowSynchronousContinuations)
{
/* Save this handle in the async state of the 'WaitForInvoke' task, that way we
can cancel a task by fishing the handle out of its state. */
this.completeSource = new TaskCompletionSource<T>(
state: this,
allowSynchronousContinuations ? TaskCreationOptions.None : TaskCreationOptions.RunContinuationsAsynchronously);
this.cancelReg = cancelToken.Register(this.Dispose, useSynchronizationContext: false);
}
internal Task<T> WaitForInvoke
{
get
{
var result = this.completeSource.Task;
Debug.Assert(result.AsyncState == this, "Invalid async-state");
return result;
}
}
internal bool TryComplete(T data)
{
this.cancelReg.Dispose();
return this.completeSource.TrySetResult(data);
}
internal bool TryCancel()
{
this.cancelReg.Dispose();
return this.completeSource.TrySetCanceled();
}
internal void Dispose() => this.TryCancel();
}
private sealed class SubscribeHandle
{
private readonly object action;
private readonly object state;
private readonly object subscriptionToken;
private readonly SynchronizationContext context;
private readonly IExceptionHandler exceptionHandler;
private volatile bool isDisposed;
internal SubscribeHandle(
object action,
object state,
object subscriptionToken,
SynchronizationContext context,
IExceptionHandler exceptionHandler)
{
Debug.Assert(action != null && (action is Action<T> || action is Action<T, object>), "Invalid action");
Debug.Assert(action != null, "Missing subscription token");
Debug.Assert(exceptionHandler != null, "Missing exception handler");
this.action = action;
this.state = state;
this.subscriptionToken = subscriptionToken;
this.context = context;
this.exceptionHandler = exceptionHandler;
}
internal object SubscriptionToken => this.subscriptionToken;
internal void Invoke(T data)
{
// Execute immediately if no context is required or if we are on the right context.
if (this.context == null || SynchronizationContext.Current == this.context)
{
this.InvokeActionInline(data);
}
else
{
this.context.Post(this.OnPost, data);
}
}
internal void Dispose() => this.isDisposed = true;
private void OnPost(object input)
{
Debug.Assert(input is T, "Posted input is of incorrect type");
/* Check if the handle has been disposed as otherwise its possible that user action
is invoked after unsubscribing (if it was already posted to the sync-context) */
if (this.isDisposed)
return;
this.InvokeActionInline((T)input);
}
private void InvokeActionInline(T input)
{
try
{
if (this.action is Action<T> statelessAction)
statelessAction.Invoke((T)input);
else
(this.action as Action<T, object>).Invoke((T)input, this.state);
}
catch (Exception e)
{
this.exceptionHandler.Handle(e);
}
}
}
}