Skip to content

Instantly share code, notes, and snippets.

@Antonio-Laguna
Created January 19, 2017 10:45
Show Gist options
  • Save Antonio-Laguna/d1b36624079ed30715db28ac338fa8a4 to your computer and use it in GitHub Desktop.
Save Antonio-Laguna/d1b36624079ed30715db28ac338fa8a4 to your computer and use it in GitHub Desktop.
Limit concurrency with RxJS
// limits a stream to `limit` active events
function limitStream(input$, outputComplete$, limit = 10) {
const initialOutputLimiter$ = Rx.Observable.range(1, limit).map(x => true);
// a stream of bools indicating when to pass the next input to the output stream
const outputLimiter$ = Rx.Observable.concat(initialOutputLimiter$, outputComplete$);
return Rx.Observable.zip(outputLimiter$, input$);
}
// Creates a responses proxy that we can pass to limitStream.
// This enables our circular dependency of passing the completed responses to
// the request limiter
const responsesProxy$ = new Rx.Subject();
// Creates the limited stream of requests
const limitedRequests$ = limitStream(requests$, responsesProxy$, 10);
// We need to track each request's completion. Usually we would map limitedRequests$
// to an XHR call and end up with a responses stream but for this example We simply
// map requests to `true`
const responses$ = limitedRequests$.map(x => true);
// Now we pipe our responses to the responsesProxy that limitedRequests$ uses to
// release another request
responses$.subscribe(responsesProxy$);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment