Last active
August 21, 2022 20:46
-
-
Save mold/e516738581a0b68423c65f44e1c3bca3 to your computer and use it in GitHub Desktop.
An rxjs combineLatest that takes an object of key/observable pairs and emits an object of key/values when any of the inner observables emits (typescript).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { combineLatest, noop, Observable } from 'rxjs'; | |
import { debounceTime, map, shareReplay, startWith, tap } from 'rxjs/operators'; | |
export interface OperatorDict<X> { | |
[key: string]: Observable<X> | [Observable<X>, X]; | |
} | |
/** | |
* Extracts the type `T` of an `Observable<T>` | |
*/ | |
export type ExtractObservableType<A> = A extends Observable<infer B> ? B : never; | |
export interface ICombineLatestOptions { | |
/** | |
* Debounce the emitted value from combineLatest (with | |
* debounceTime(0)) | |
* | |
* @default true | |
*/ | |
debounce?: boolean; | |
/** | |
* Initialize every observable with null. Does not override any | |
* startWith value set in the original input object. | |
* | |
* @default false | |
*/ | |
startWithNull?: boolean; | |
/** | |
* Logs the state on every new emission | |
* | |
* @default false | |
*/ | |
logState?: boolean; | |
/** | |
* Share replays the whole thing. | |
* | |
* @default true | |
*/ | |
shareReplay?: boolean; | |
} | |
const nop = <T>() => tap<T>(noop); | |
/** | |
* Takes a key/value object of observables or tuples: | |
* | |
* ``` | |
* { | |
* obs1: of(123), | |
* obs2: [of("value").pipe(delay(1000)), "startWith value"], | |
* } | |
* ``` | |
* | |
* and every time one of the source observables emits, emits an object | |
* with the latest value from all observables: | |
* | |
* ``` | |
* { | |
* obs1: 123, | |
* obs2: "startWith value", | |
* } | |
* ``` | |
* @param observables | |
* @param debounce | |
*/ | |
export const combineLatestToObject = < | |
TIn extends OperatorDict<any>, | |
TOut extends { [K in keyof TIn]: ExtractObservableType<TIn[K] extends Array<any> ? TIn[K][0] : TIn[K]> } | |
>( | |
observables: TIn, | |
{ | |
debounce = true, | |
startWithNull = false, | |
logState = false, | |
shareReplay: doShareReplay = true, | |
} = {} as ICombineLatestOptions, | |
): Observable<TOut> => { | |
const keys = Object.keys(observables); | |
return combineLatest( | |
keys.map(k => { | |
const obs = observables[k]; | |
return Array.isArray(obs) | |
? obs[0].pipe(startWith(obs[1])) | |
: obs.pipe(startWithNull ? startWith(null) : nop()); | |
}), | |
).pipe( | |
debounce ? debounceTime(0) : nop(), | |
map(b => b.reduce((acc, val, i) => ({ ...acc, [keys[i]]: val }), {})), | |
logState ? tap(state => console.log({ state })) : nop(), | |
// keep this at the end | |
doShareReplay ? shareReplay({ refCount: true, bufferSize: 1 }) : nop(), | |
); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment