Skip to content

Instantly share code, notes, and snippets.

Last active February 17, 2020 02:53
Show Gist options
  • Save nilsmehlhorn/f7ab3137939d5faff6ec9927d4c63455 to your computer and use it in GitHub Desktop.
Save nilsmehlhorn/f7ab3137939d5faff6ec9927d4c63455 to your computer and use it in GitHub Desktop.
Miscellaneous RxJs Operators
import {filter, map, shareReplay, catchError} from 'rxjs/operators'
import {Observable, Subject, throwError, defer} from 'rxjs'
import {Option} from 'fp-ts/lib/Option'
import {HttpErrorResponse} from '@angular/common/http'
import {HttpStatus} from 'http-status-codes'
* Deserializes plain object to class instance.
* @param ClassType class to instantiate
* @returns stream with deserialization applied
export const mapToClass = <T>(ClassType) => (source: Observable<T>): Observable<T> => source.pipe(
map(val => Object.assign(new ClassType(), val))
* Filters defined values.
* @returns stream with only defined values
export const filterUndefined = <T>() => (source: Observable<T>): Observable<T> => source.pipe(
filter(v => !!v)
* Filters defined option values and unpacks them.
* @returns stream with options that contain value
export const filterSome = <T>() => (source: Observable<Option<T>>): Observable<T> => source.pipe(
filter(o => o.isSome()),
map(o => o.toNullable())
* Shares an underlying observable for use with multiple async-pipes on the same source.
export const conflate = <T>() => (source: Observable<T>): Observable<T> => source.pipe(
bufferSize: 1,
refCount: true
* Invokes a callback upon subscription.
* @param callback function to invoke upon subscription
* @returns stream which will invoke callback
export function prepare<T>(callback: () => void): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>): Observable<T> => defer(() => {
return source;
* Indicates whether the observable is currently loading (meaning subscription is active and
* it hasn't completed or errored).
* @param indicator subject as target for undication
* @returns stream which will indicate loading through passed subject
export function indicate<T>(indicator: Subject<boolean>): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>): Observable<T> => source.pipe(
prepare(() =>,
finalize(() =>
* Maps Angular HTTP status codes to more semantic errors.
* @param codeErrors mapping from status code to error
* @returns stream which will map http error codes to passed errors
export const throwForCodes = (codeErrors: Array<[HttpStatus, Error]>) => {
const mappedCodeErrors = new Map(codeErrors)
return <T>(source: Observable<T>) =>
source.pipe(catchError(error => {
if (error instanceof HttpErrorResponse) {
return throwError(mappedCodeErrors.get(error.status) || error)
return throwError(error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment