Skip to content

Instantly share code, notes, and snippets.

@JozefFlakus
Last active January 31, 2020 19:09
Show Gist options
  • Save JozefFlakus/1ad8c4ae8ba3e3dbf1b9608bd555d430 to your computer and use it in GitHub Desktop.
Save JozefFlakus/1ad8c4ae8ba3e3dbf1b9608bd555d430 to your computer and use it in GitHub Desktop.
Marble.js 3.0 - continuous streams
const foo$ = r.pipe(
r.applyMeta({ continuous: true }),
r.matchPath('/'),
r.matchType('GET'),
r.useEffect((req$, ctx) => {
const reqBus$ = useContext(HttpRequestBusToken)(ctx.ask);
const terminate$ = reqBus$.pipe(filter(req => req.url === '/flush'));
return req$.pipe(
bufferWhen(() => terminate$),
mergeMap(buffer => from(buffer)),
mergeMap(request => of(request).pipe(
mergeMap(processData),
map(body => ({ body, request })),
catchError(error => of({
request,
status: HttpStatus.BAD_REQUEST,
body: { error: { message: error.message }}
})),
)),
);
}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment