Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Created September 21, 2021 15:26
Show Gist options
  • Save HaloFour/6cac6653120da96fc1e8e1effca7b5b5 to your computer and use it in GitHub Desktop.
Save HaloFour/6cac6653120da96fc1e8e1effca7b5b5 to your computer and use it in GitHub Desktop.
TracingOperator with Context
import java.util.function.BiFunction;
import java.util.function.Function;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
public class Example {
private static final OpenTelemetry openTelemetry = configureOpenTelemetry();
private static final Tracer tracer = openTelemetry.getTracer("test");
private static final Mono<String> READ_ITEM = readItem();
public static void main(String[] args) {
Span span = tracer.spanBuilder("parent").startSpan();
try (Scope ignored = span.makeCurrent()) {
String item = READ_ITEM.block();
System.out.println(item);
}
}
static Mono<String> readItem() {
return httpGet("https://mydatabase").map(v -> "item: " + v)
.transform(TracingOperator.withSpan("readItem"));
}
static Mono<String> httpGet(String url) {
return Mono.defer(() -> Mono.just("http response from " + url))
.transform(TracingOperator.withSpan("httpGet", builder -> builder
.setAttribute("url", url)
));
}
private static OpenTelemetry configureOpenTelemetry() {
return OpenTelemetrySdk.builder()
.setTracerProvider(SdkTracerProvider.builder()
.addSpanProcessor(new SpanProcessor() {
@Override
public void onStart(io.opentelemetry.context.Context parentContext, ReadWriteSpan span) {
}
@Override
public boolean isStartRequired() {
return false;
}
@Override
public void onEnd(ReadableSpan span) {
System.out.println(span);
}
@Override
public boolean isEndRequired() {
return true;
}
})
.build())
.buildAndRegisterGlobal();
}
}
final class TracingOperator<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static final String REACTOR_OTEL_CONTEXT_KEY = "otel-context";
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName) {
return withSpan(spanName, Function.identity());
}
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName, Function<SpanBuilder, SpanBuilder> builder) {
return (Function<Publisher<T>, Publisher<T>>) Operators.lift(new TracingOperator<T>(spanName, builder));
}
private final Tracer tracer;
private final String spanName;
private final Function<SpanBuilder, SpanBuilder> builder;
private final io.opentelemetry.context.Context assemblyContext;
private TracingOperator(String spanName, Function<SpanBuilder, SpanBuilder> builder) {
this.tracer = GlobalOpenTelemetry.getTracer("reactor");
this.spanName = spanName;
this.builder = builder;
this.assemblyContext = io.opentelemetry.context.Context.current();
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> subscriber) {
Context subscriberContext = subscriber.currentContext();
io.opentelemetry.context.Context parentContext = figureOutParentContext(subscriberContext);
Span span = builder.apply(tracer.spanBuilder(spanName).setParent(parentContext)).startSpan();
io.opentelemetry.context.Context spanContext = parentContext.with(span);
subscriberContext = subscriberContext.put(REACTOR_OTEL_CONTEXT_KEY, spanContext);
return new TracingSubscriber<>(subscriber, subscriberContext, spanContext, span);
}
/**
* We have three potential OTel contexts here and we need to work out which is the best one to use.
*
* <ul>
* <li>The current OTel context</li>
* <li>The OTel Context stored in the Reactor Context</li>
* <li>The OTel context captured during assembly</li>
* </ul>
*/
private io.opentelemetry.context.Context figureOutParentContext(Context subscriberContext) {
if (subscriberContext.hasKey(REACTOR_OTEL_CONTEXT_KEY)) {
return subscriberContext.get(REACTOR_OTEL_CONTEXT_KEY);
}
return io.opentelemetry.context.Context.current();
}
}
final class TracingSubscriber<T> implements CoreSubscriber<T>, Subscription {
private static final AttributeKey<Boolean> REACTOR_CANCELED_ATTRIBUTE_KEY = AttributeKey.booleanKey("reactor.canceled");
private final CoreSubscriber<T> subscriber;
private final Context subscriberContext;
private final io.opentelemetry.context.Context otelContext;
private final Span span;
private Subscription subscription;
public TracingSubscriber(CoreSubscriber<T> subscriber, Context subscriberContext, io.opentelemetry.context.Context otelContext, Span span) {
this.subscriber = subscriber;
this.subscriberContext = subscriberContext;
this.otelContext = otelContext;
this.span = span;
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
try (Scope scope = this.otelContext.makeCurrent()) {
subscriber.onSubscribe(this);
}
}
}
@Override
public void request(long n) {
if (subscription != null) {
try (Scope scope = this.otelContext.makeCurrent()) {
subscription.request(n);
}
}
}
@Override
public void cancel() {
if (subscription != null) {
subscription.cancel();
if (span != null) {
span.setAttribute(REACTOR_CANCELED_ATTRIBUTE_KEY, true)
.end();
}
}
}
@Override
public void onNext(T t) {
try (Scope scope = otelContext.makeCurrent()) {
subscriber.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (span != null) {
span.recordException(t)
.end();
}
try (Scope scope = otelContext.makeCurrent()) {
subscriber.onError(t);
}
}
@Override
public void onComplete() {
if (span != null) {
span.end();
}
try (Scope scope = otelContext.makeCurrent()) {
subscriber.onComplete();
}
}
@Override
public Context currentContext() {
return this.subscriberContext;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment