Skip to content

Instantly share code, notes, and snippets.

@robfe
Created October 7, 2013 22:51
Show Gist options
  • Save robfe/6876349 to your computer and use it in GitHub Desktop.
Save robfe/6876349 to your computer and use it in GitHub Desktop.
[RX] A live selectmany for items currently inside an observable collection
public static IObservable<TResult> SelectMany<TSource, TResult>(this ObservableCollection<TSource> collection, Func<TSource, IObservable<TResult>> selector)
{
return Observable.Create<TResult>(observer =>
{
// assume events are raised on the same thread (i.e. dispatcher), so we don't need a concurrent dictionary
var subscriptions = new Dictionary<TSource, IDisposable>();
foreach (var source in collection)
{
//subscribe selector to all items already in the collection
subscriptions[source] = selector(source).Subscribe(observer);
}
var collectionChangeSubscription = Observable
.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
handler => (sender, args) => handler(sender, args),
x => collection.CollectionChanged += x,
x => collection.CollectionChanged -= x)
.Select(x => x.EventArgs)
.Subscribe(evp =>
{
if (evp.NewItems != null)
{
//subscribe selector to all new items
foreach (TSource newItem in evp.NewItems)
{
subscriptions[newItem] = selector(newItem).Subscribe(observer);
}
}
if (evp.OldItems != null)
{
//unsubscribe the selector's subscriptions to removed items
foreach (TSource oldItem in evp.OldItems)
{
subscriptions[oldItem].Dispose();
subscriptions.Remove(oldItem);
}
}
});
return () =>
{
//stop listening for changes to the ObservableCollection
collectionChangeSubscription.Dispose();
foreach (var subscription in subscriptions.Values)
{
//clear any current subsciptions to items in the collection
subscription.Dispose();
}
};
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment