Last active
November 9, 2019 14:33
-
-
Save siddharthpal/4b2fe932c6e41dd84bd1d489d8733ba0 to your computer and use it in GitHub Desktop.
Polling multiple resources with exponential back-off strategy and 3 times retry for failed http requests. #pool-polling #rxjs6 #reactive-programming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { timer as observableTimer, Subscription, interval, of, concat, SchedulerLike, asyncScheduler, Observable } from 'rxjs'; | |
import { takeWhile, tap, take, switchMap, repeat, retryWhen, scan, mapTo, expand, exhaustMap } from 'rxjs/operators'; | |
import { Injectable, Inject } from '@angular/core'; | |
import { HttpClient } from '@angular/common/http'; | |
export interface IntervalBackoffConfig { | |
initialInterval: number; | |
maxInterval?: number; | |
backoffDelay?: (iteration: number, initialInterval: number) => number; | |
} | |
export interface PollConfig { | |
interval?: number; | |
attempts?: number; | |
delay?: number; | |
exponentialUnit?: number; | |
maxBackoffTime?: number; | |
} | |
@Injectable() | |
export class PollingService { | |
private pollSubscriptions: Subscription; | |
private defaultConfig: Partial<PollConfig> = { | |
interval: 1000, | |
attempts: 3, | |
exponentialUnit: 1000, | |
delay: 3000, | |
maxBackoffTime: 10000 | |
}; | |
constructor( | |
private http: HttpClient, | |
) { | |
this.pollSubscriptions = new Subscription(); | |
} | |
pollRequest<T>( | |
url: string, | |
updateStatus: any, | |
pollWhileCondition: Function, | |
pollErrorCallback?: Function, | |
onPollingSuccessCallback?: Function, | |
userConfig?: PollConfig | |
) { | |
const options = Object.assign({}, this.defaultConfig, userConfig); | |
if (this.pollSubscriptions.closed) { | |
this.pollSubscriptions = new Subscription(); | |
} | |
const request$ = this.http.get<T>(url); | |
const firstRequest$ = request$; | |
const polling$ = interval(options.interval).pipe( | |
take(1), | |
switchMap(() => this.intervalBackoff({ | |
initialInterval: options.exponentialUnit, | |
maxInterval: options.maxBackoffTime | |
})), | |
exhaustMap(() => request$), | |
repeat() | |
); | |
this.pollSubscriptions.add(concat(firstRequest$, polling$).pipe( | |
retryWhen(errors$ => { | |
return errors$.pipe( | |
scan( | |
({ errorCount, error }, err) => { | |
return { errorCount: errorCount + 1, error: err }; | |
}, | |
{ errorCount: 0, error: null } | |
), | |
switchMap(({ errorCount, error }) => { | |
if (errorCount >= options.attempts) { | |
throw error; | |
} | |
return observableTimer(options.delay, null); | |
}) | |
); | |
}), | |
).pipe(tap(updateStatus), takeWhile(data => pollWhileCondition(data))).subscribe()); | |
} | |
stopPolling(): void { | |
this.pollSubscriptions.unsubscribe(); | |
} | |
private getDelay(backoffDelay: number, maxInterval: number) { | |
return Math.min(backoffDelay, maxInterval); | |
} | |
private exponentialBackoffDelay(iteration: number, initialInterval: number) { | |
return Math.pow(2, iteration) * initialInterval; | |
} | |
private intervalBackoff( | |
config: number | IntervalBackoffConfig, | |
scheduler: SchedulerLike = asyncScheduler, | |
): Observable<number> { | |
let { | |
initialInterval, | |
maxInterval = 10000, | |
backoffDelay = this.exponentialBackoffDelay | |
} = typeof config === 'number' ? { initialInterval: config } : config; | |
initialInterval = (initialInterval < 0) ? 0 : initialInterval; | |
return of(0, scheduler).pipe( | |
expand(iteration => | |
observableTimer(this.getDelay(backoffDelay(iteration, initialInterval), maxInterval)) | |
.pipe(mapTo(iteration + 1)) | |
) | |
); | |
} | |
} | |
/* ==================================== e.g to use the above code ========================== */ | |
/ * Step 1 : Please inject the above service file in your component */ | |
constructor( | |
private pollingSvc: PollingService, | |
) { | |
} | |
/ * Step 2 : Define poll while condition and status update methods */ | |
updateStatus(pollStatus: PollStatus) { | |
this.emitterSub.next(pollStatus); | |
} | |
pollWhileCondition(pollStatus: PollStatus) { | |
return pollStatus.executionStatus === 'running'; //If poll url status isstill running then our polling should continue | |
} | |
/ * Step 3: Call the PollService pollRequest method and you can see the magic */ | |
this.pollingSvc.pollRequest('http://poll.url', | |
this.updateStatus.bind(this), | |
this.pollWhileCondition.bind(this)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment