Skip to content

Instantly share code, notes, and snippets.

@ShlomoAbraham
Last active February 27, 2018 17:08
Show Gist options
  • Save ShlomoAbraham/1b6e8dc1c7e00fc59f48fc690f4375cf to your computer and use it in GitHub Desktop.
Save ShlomoAbraham/1b6e8dc1c7e00fc59f48fc690f4375cf to your computer and use it in GitHub Desktop.
An example of state with Rx
void Main()
{
//For each number: If the previous even-indexed numbers sum to greater than 10, then multiply by -1
var numbers = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10))
.Take(100)
.Select((num, index) => Tuple.Create(num, index))
.StateSelect(0,
(state, indexedInt) => state > 10 ? indexedInt.Item1 * -1 : indexedInt.Item1, //result selector
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state
);
numbers.Dump(); //LinqPad
//For each number: If the sum of the previous even-indexed numbers and the new number sum to greater than 10, then skip it
var numbers2 = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10))
.Take(100)
.Select((num, index) => Tuple.Create(num, index))
.StateSelectFiltered(0,
(state, indexedInt) => state > 10 ? indexedInt.Item1 * -1 : indexedInt.Item1, //result selector
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state,
(state, indexedInt) => !(state + indexedInt.Item1 > 10)
);
numbers2.Dump(); //LinqPad
//For each number: If the previous even-indexed numbers sum to greater than 10, then emit the number and also a -1
var numbers3 = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10))
.Take(100)
.Select((num, index) => Tuple.Create(num, index))
.StateSelectMany(0,
(state, indexedInt) => state > 10 ? new[] { indexedInt.Item1, -1 }.ToObservable() : Observable.Return(indexedInt.Item1), //result selector
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state
);
numbers3.Dump(); //LinqPad
}
// Define other methods and classes here
public static class Extensions
{
public static IObservable<TResult> StateSelect<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, TResult> resultSelector,
Func<TState, TSource, TState> stateSelector
)
{
return source
.StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector);
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector,
Func<TState, TSource, TState> stateSelector
)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
public static IObservable<TResult> StateSelectFiltered<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, TResult> resultSelector,
Func<TState, TSource, TState> stateSelector,
Func<TState, TSource, bool> filter
)
{
return source
.StateSelectMany(initialState, (state, item) =>
filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(),
stateSelector);
}
}
public static class RxReimplementations
{
//This is a functional implementation of Observable.Scan
public static IObservable<TState> FunctionalScan<TSource, TState>(this IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> f)
{
return source.Publish(_source => _source
.Take(1)
.SelectMany(item => f(initialState, item)
.Using(newState =>
Observable.Return(newState)
.Concat(_source.FunctionalScan(newState, f))
)
)
);
}
//A functional way to re-use a function result.
public static U Using<T, U>(this T t, Func<T, U> f)
{
return f(t);
}
}
@ShlomoAbraham
Copy link
Author

There's three classes here: Extensions which has methods to help observe observables with state, a Main runner to show the use of the new extensions, and also a class RxReimplentations, which shows how you can implement Scan in a functional way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment