Skip to content

Instantly share code, notes, and snippets.

@chemicL
Created August 24, 2024 07:59
Show Gist options
  • Save chemicL/9e5e01cda24a97f16dbecba97c5fdfbc to your computer and use it in GitHub Desktop.
Save chemicL/9e5e01cda24a97f16dbecba97c5fdfbc to your computer and use it in GitHub Desktop.
Observable arbiters around a streaming request-response chain
package dev.jedrzejczyk.reactorlab;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class WrapAroundExercise {
public static void main(String[] args) {
ArbiterAppender a = new ArbiterAppender("A");
ArbiterAppender b = new ArbiterAppender("B");
Request request = new Request("Request");
List<Arbiter> arbiters = List.of(a, b);
// imperativeStyle(request, arbiters);
reactiveStyle(request, arbiters);
}
static void reactiveStyle(Request request, List<Arbiter> arbiters) {
Flux.fromIterable(arbiters)
.transformDeferredContextual((f, ctx) -> f
// This allows us to call blocking code in reduce
.publishOn(Schedulers.boundedElastic())
.reduce(request, (r, a) -> {
// TODO: Here we can combine reactive with imperative by wrapping
// the imperative call with an Observation that has the
// parent set to ctx.get(ObservationThreadLocalAccessor.KEY)
return a.before(r);
}))
// Because the above accepts a Publisher we need to enforce mapping to Mono
.single()
.doOnNext(r -> System.out.println("Request: " + r))
.flatMapMany(r -> {
// We can apply a similar strategy to the after arbitters for each
// response or for an aggregation of the response.
return streamResponse(r);
})
.blockLast();
}
static void imperativeStyle(Request request, List<Arbiter> arbiters) {
for (Arbiter arbiter : arbiters) {
request = arbiter.before(request);
}
System.out.println("Request: " + request);
Response response = makeRequest(request);
for (Arbiter arbiter : arbiters.reversed()) {
response = arbiter.after(response);
}
System.out.println("Response: " + response);
}
static Response makeRequest(Request request) {
return new Response("Response");
}
static Flux<Response> streamResponse(Request request) {
return Flux.just(new Response("Streaming response"));
}
static class Request {
String content;
public Request(String content) {
this.content = content;
}
String content() {
return content;
}
@Override
public String toString() {
return content;
}
}
static class Response {
String content;
public Response(String content) {
this.content = content;
}
String content() {
return content;
}
@Override
public String toString() {
return content;
}
}
static interface Arbiter {
Request before(Request request);
Response after(Response response);
}
static class ArbiterAppender implements Arbiter {
String suffix;
public ArbiterAppender(String suffix) {
this.suffix = suffix;
}
@Override
public Request before(Request request) {
return new Request(request.content() + " _ " + suffix);
}
@Override
public Response after(Response response) {
return new Response(response.content() + " _ " + suffix);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment