Created
September 23, 2022 13:39
-
-
Save alaindet/d3a4025fa1167e22fca9e3cd06943d83 to your computer and use it in GitHub Desktop.
RxJS self-closing sources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { BehaviorSubject, Observable, takeUntil } from 'rxjs'; | |
import { OnceSource } from './once-source'; | |
export type UpdaterFn<T> = (prev: T) => T; | |
/** | |
* # Data Source | |
* | |
* This wraps a data source in a RxJS BehaviorSubject and exposes an API for | |
* centralized reads and writes to the data source. Best suited for complex | |
* nested components with services | |
* | |
* ## Usage | |
* Best suited in services connecting multiple nested components, otherwise it's | |
* total overkill | |
* | |
* ## Lifecycle | |
* Lifecycle of the data source can be controlled directly or indirectly | |
* - Indirect control requires you to pass a signal$ of type Observable<void> to | |
* the constructor, which will then be used as a trigger to shut down the | |
* data source | |
* - Direct control requires no signal$, byt you must call EventSource.destroy() | |
* whenever the data source must be shut down | |
* | |
* ## Example | |
* | |
* ```ts | |
* @Component({ | |
* selector: 'my-component', | |
* template: '<p>Open? {{ open.data$ | async ? 'YES' : 'NO' }}</p>', | |
* }) | |
* class MyComponent implements OnInit, OnDestroy { | |
* | |
* private once = new OnceSource(); | |
* | |
* open = new DataSource<boolean>(false); | |
* indirect1 = new DataSource<string>('Something worth it.', this.once.event$); | |
* indirect2 = new DataSource<number>(42, this.once.event$); | |
* | |
* ngOnInit() { | |
* // Read data | |
* this.open.data$.subscribe(open => console.log('open', open)); | |
* | |
* // Set data based on previous data | |
* this.open.next(open => !open); | |
* | |
* // Just override data | |
* this.open.next(false); | |
* | |
* // Read data synchronously (not recommended) | |
* console.log('Current open', this.open.getCurrent()); | |
* } | |
* | |
* ngOnDestroy() { | |
* this.open.destroy(); // Direct lifecycle management | |
* | |
* // This triggers indirect control on every controlled source | |
* // Here, this.indirect1 and this.indirect2 | |
* this.once.trigger(); | |
* } | |
* } | |
* ``` | |
*/ | |
export class DataSource<T = any> { | |
data$!: Observable<T>; | |
private source$!: BehaviorSubject<T>; | |
private _current!: T; | |
private once?: OnceSource; | |
constructor(initial: T, signal$?: Observable<void>) { | |
this._current = initial; | |
this.source$ = new BehaviorSubject<T>(this._current); | |
if (!signal$) { | |
this.once = new OnceSource(); | |
signal$ = this.once.event$; | |
} | |
signal$.subscribe(() => this.destroy()); | |
this.data$ = this.source$.asObservable().pipe(takeUntil(signal$)); | |
} | |
destroy(): void { | |
if (!this.once) return; | |
this.once.trigger(); | |
} | |
getSource(): BehaviorSubject<T> { | |
return this.source$; | |
} | |
getCurrent(): T { | |
return this._current; | |
} | |
/** | |
* If passed a value, just updates the stream | |
* If passed a function, calls the function passing it the previous value and | |
* expecting the new value to be returned, then updates the stream with | |
* returned value | |
*/ | |
next(updater: T | UpdaterFn<T>): void { | |
if (typeof updater === 'function') { | |
this._current = (updater as Function)(this.source$.getValue()); | |
this.source$.next(this._current); | |
} else { | |
this._current = updater; | |
this.source$.next(updater); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subject, takeUntil } from 'rxjs'; | |
import { OnceSource } from './once-source'; | |
/** | |
* # Event source | |
* | |
* This wraps an event source in a RxJS Subject and exposes and API for | |
* centralized reads and writes to the event Source. | |
* | |
* ## Usage | |
* Best suited in services connecting multiple nested components, otherwise it's | |
* total overkill | |
* | |
* ## Lifecycle | |
* Lifecycle of the event source can be controlled directly or indirectly | |
* - Indirect control requires you to pass a signal$ of type Observable<void> to | |
* the constructor, which will then be used as a trigger to shut down the | |
* event source | |
* - Direct control requires no signal$, byt you must call EventSource.destroy() | |
* whenever the event source must be shut down | |
* | |
* ## Example | |
* | |
* ```ts | |
* @Component({ | |
* selector: 'my-component', | |
* template: ` | |
* <div> | |
* <p>A or B?</p> | |
* <button (click)="onSelectA()">A</button> | |
* <button (click)="onSelectB()">B</button> | |
* </div> | |
* `, | |
* }) | |
* class MyComponent implements OnInit, OnDestroy { | |
* | |
* private once = new OnceSource(); | |
* | |
* @Output() selectedA = new EventEmitter<void>(); | |
* @Output() selectedB = new EventEmitter<void>(); | |
* | |
* private selectAEvent = new EventSource(this.once.event$); | |
* private selectBEvent = new EventSource(); | |
* | |
* ngOnInit() { | |
* this.selectAEvent.event$.subscribe(e => this.selectedA.emit(e)); | |
* this.selectBEvent.event$.subscribe(e => this.selectedB.emit(e)); | |
* } | |
* | |
* onSelectA() { | |
* this.selectAEvent.next(); | |
* } | |
* | |
* onSelectB() { | |
* this.selectBEvent.next(); | |
* } | |
* | |
* ngOnDestroy() { | |
* this.selectBEvent.destroy(); // Direct control | |
* | |
* // This triggers indirect control on every controlled source | |
* // Here, this.indirect1 and this.indirect2 | |
* this.once.trigger(); | |
* } | |
* } | |
* ``` | |
*/ | |
export class EventSource<T = any> { | |
event$: Observable<T>; | |
private source$!: Subject<T>; | |
private once?: OnceSource; | |
constructor(signal$?: Observable<void>) { | |
this.source$ = new Subject<T>(); | |
this.event$ = this.source$.asObservable(); | |
if (!signal$) { | |
this.once = new OnceSource(); | |
signal$ = this.once.event$; | |
} | |
signal$.subscribe(() => this.destroy()); | |
this.event$ = this.source$.asObservable().pipe(takeUntil(signal$)); | |
} | |
destroy(): void { | |
if (!this.once) return; | |
this.once.trigger(); | |
} | |
next(eventData: T): void { | |
this.source$.next(eventData); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subject, takeUntil } from 'rxjs'; | |
export class OnceSource { | |
event$: Observable<void>; | |
private source$!: Subject<void>; | |
constructor() { | |
this.source$ = new Subject<void>(); | |
this.event$ = this.source$.asObservable().pipe(takeUntil(this.source$)); | |
} | |
trigger(): void { | |
this.source$.next(); | |
this.source$.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment