Skip to content

Instantly share code, notes, and snippets.

Last active February 17, 2019 19:13
Show Gist options
  • Save Misterhex/7c1dbd0e88fb57c2e98780fd09b40668 to your computer and use it in GitHub Desktop.
Save Misterhex/7c1dbd0e88fb57c2e98780fd09b40668 to your computer and use it in GitHub Desktop.
Rx extensions from reactive trader
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace Adaptive.ReactiveTrader.Shared.Extensions
public static class ObservableExtensions
public static IObservable<T> LazilyConnect<T>(this IConnectableObservable<T> connectable, SingleAssignmentDisposable futureDisposable)
var connected = 0;
return Observable.Create<T>(observer =>
var subscription = connectable.Subscribe(observer);
if (Interlocked.CompareExchange(ref connected, 1, 0) == 0)
if (!futureDisposable.IsDisposed)
futureDisposable.Disposable = connectable.Connect();
return subscription;
public static IObservable<T> CacheFirstResult<T>(this IObservable<T> observable)
// We are happy to lose the underlying subscription here because we have .Take(1) the source stream.
return observable.Take(1).PublishLast().LazilyConnect(new SingleAssignmentDisposable());
public static IObservable<TSource> TakeUntilInclusive<TSource>(this IObservable<TSource> source, Func<TSource, Boolean> predicate)
return Observable.Create<TSource>(
observer => source.Subscribe(
item =>
if (predicate(item))
public static IObservable<T> OnSubscribe<T>(this IObservable<T> source, Action onSubscribe)
return Observable.Create<T>(observer =>
return source.Subscribe(observer);
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
return Observable.Create<T>(observer =>
var gate = new object();
bool active = false;
var cancelable = new SerialDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
bool wasNotAlreadyActive;
Notification<T> outsideNotification;
lock (gate)
wasNotAlreadyActive = !active;
active = true;
outsideNotification = thisNotification;
if (wasNotAlreadyActive)
cancelable.Disposable = scheduler.Schedule(self =>
Notification<T> localNotification;
lock (gate)
localNotification = outsideNotification;
outsideNotification = null;
bool hasPendingNotification;
lock (gate)
hasPendingNotification = active = (outsideNotification != null);
if (hasPendingNotification)
return new CompositeDisposable(disposable, cancelable);
/// <summary>
/// Applies a conflation algorithm to an observable stream.
/// Anytime the stream OnNext twice below minimumUpdatePeriod, the second update gets delayed to respect the minimumUpdatePeriod
/// If more than 2 update happen, only the last update is pushed
/// Updates are pushed and rescheduled using the provided scheduler
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">stream</param>
/// <param name="minimumUpdatePeriod">minimum delay between 2 updates</param>
/// <param name="scheduler">to be used to publish updates and schedule delayed updates</param>
/// <returns></returns>
public static IObservable<T> Conflate<T>(this IObservable<T> source, TimeSpan minimumUpdatePeriod, IScheduler scheduler)
return Observable.Create<T>(observer =>
// indicate when the last update was published
var lastUpdateTime = DateTimeOffset.MinValue;
// indicate if an update is currently scheduled
var updateScheduled = new MultipleAssignmentDisposable();
// indicate if completion has been requested (we can't complete immediately if an update is in flight)
var completionRequested = false;
var gate = new object();
var subscription = source
x =>
var currentUpdateTime = scheduler.Now;
bool scheduleRequired;
lock (gate)
scheduleRequired = currentUpdateTime - lastUpdateTime < minimumUpdatePeriod;
if (scheduleRequired && updateScheduled.Disposable != null)
updateScheduled.Disposable = null;
if (scheduleRequired)
updateScheduled.Disposable = scheduler.Schedule(lastUpdateTime + minimumUpdatePeriod, () =>
lock (gate)
lastUpdateTime = scheduler.Now;
updateScheduled.Disposable = null;
if (completionRequested)
lock (gate)
lastUpdateTime = scheduler.Now;
() =>
// if we have scheduled an update we need to complete once the update has been published
if (updateScheduled.Disposable != null)
lock (gate)
completionRequested = true;
return subscription;
/// <summary>
/// Injects heartbeats in a stream when the source stream becomes quiet:
/// - upon subscription if the source does not OnNext any update a heartbeat will be pushed after heartbeatPeriod, periodilcally until source receives an update
/// - when an update is received it is immediatly pushed. After this update, if source does not OnNext after heartbeatPeriod, heartbeats will be pushed
/// </summary>
/// <typeparam name="T">update type</typeparam>
/// <param name="source">source stream</param>
/// <param name="heartbeatPeriod"></param>
/// <param name="scheduler"></param>
/// <returns></returns>
public static IObservable<IHeartbeat<T>> Heartbeat<T>(this IObservable<T> source, TimeSpan heartbeatPeriod, IScheduler scheduler)
return Observable.Create<IHeartbeat<T>>(observer =>
var heartbeatTimerSubscription = new MultipleAssignmentDisposable();
var gate = new object();
Action scheduleHeartbeats = () =>
var disposable = Observable
.Timer(heartbeatPeriod, heartbeatPeriod, scheduler)
_ => observer.OnNext(new Heartbeat<T>()));
lock (gate)
heartbeatTimerSubscription.Disposable = disposable;
var sourceSubscription = source.Subscribe(
x =>
lock (gate)
// cancel any scheduled heartbeat
observer.OnNext(new Heartbeat<T>(x));
return new CompositeDisposable { sourceSubscription, heartbeatTimerSubscription };
/// <summary>
/// Detects when a stream becomes inactive for some period of time
/// </summary>
/// <typeparam name="T">update type</typeparam>
/// <param name="source">source stream</param>
/// <param name="stalenessPeriod">if source steam does not OnNext any update during this period, it is declared staled</param>
/// <param name="scheduler"></param>
/// <returns></returns>
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
return Observable.Create<IStale<T>>(observer =>
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
timerSubscription.Disposable = Observable
.Timer(stalenessPeriod, scheduler)
.Subscribe(_ =>
lock (observerLock)
observer.OnNext(new Stale<T>());
var sourceSubscription = source.Subscribe(
x =>
// cancel any scheduled stale update
var disposable = timerSubscription.Disposable;
if (disposable != null)
lock (observerLock)
observer.OnNext(new Stale<T>(x));
return new CompositeDisposable { sourceSubscription, timerSubscription };
public static IObservable<Unit> ObserveProperty(this INotifyPropertyChanged source)
return Observable.FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
h => (s, e) => h(e),
h => source.PropertyChanged += h,
h => source.PropertyChanged -= h)
.Select(_ => Unit.Default);
public static IObservable<TProp> ObserveProperty<TSource, TProp>(this TSource source,
Expression<Func<TSource, TProp>> propertyExpression,
bool observeInitialValue)
where TSource : INotifyPropertyChanged
return Observable.Create<TProp>(o =>
var propertyName = GetPropertyName(source, propertyExpression);
var selector = CompiledExpressionHelper<TSource, TProp>.GetFunc(propertyExpression);
var observable
= from evt in Observable.FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
h => (s, e) => h(e),
h => source.PropertyChanged += h,
h => source.PropertyChanged -= h)
where evt.PropertyName == propertyName
select selector(source);
observable = observeInitialValue ? observable.StartWith(selector(source)) : observable;
return observable.Subscribe(o);
public static IObservable<TProp> ObserveProperty<TSource, TProp>(this TSource source,
Expression<Func<TSource, TProp>>
where TSource : INotifyPropertyChanged
return ObserveProperty(source, propertyExpression, false);
public static string GetPropertyName<TSource, TProp>(this TSource source,
Expression<Func<TSource, TProp>> propertyExpression)
var memberExpression = CompiledExpressionHelper<TSource, TProp>.GetMemberExpression(propertyExpression);
return memberExpression.Member.Name;
private static class CompiledExpressionHelper<TSource, TProp>
private static readonly Dictionary<string, Func<TSource, TProp>> Funcs = new Dictionary<string,Func<TSource,TProp>>();
public static Func<TSource, TProp> GetFunc(Expression<Func<TSource, TProp>> propertyExpression)
var memberExpression = GetMemberExpression(propertyExpression);
var propertyName = memberExpression.Member.Name;
var key = typeof (TSource).FullName + "." + propertyName;
Func<TSource, TProp> func;
if (!Funcs.TryGetValue(key, out func))
Funcs[key] = propertyExpression.Compile();
return Funcs[key];
public static MemberExpression GetMemberExpression(Expression<Func<TSource, TProp>> propertyExpression)
MemberExpression memberExpression;
var unaryExpr = propertyExpression.Body as UnaryExpression;
if (unaryExpr != null && unaryExpr.NodeType == ExpressionType.Convert)
memberExpression = (MemberExpression) unaryExpr.Operand;
memberExpression = (MemberExpression) propertyExpression.Body;
if (memberExpression.Expression.NodeType != ExpressionType.Parameter && memberExpression.Expression.NodeType != ExpressionType.Constant)
throw new InvalidOperationException("Getting members not directly on the expression's root object has been disallowed.");
return memberExpression;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment