TracingOperator with Context
import java.util.function.BiFunction;
import java.util.function.Function;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
public class Example {
private static final OpenTelemetry openTelemetry = configureOpenTelemetry();
private static final Tracer tracer = openTelemetry.getTracer("test");
private static final Mono<String> READ_ITEM = readItem();
public static void main(String[] args) {
Span span = tracer.spanBuilder("parent").startSpan();
try (Scope ignored = span.makeCurrent()) {
String item = READ_ITEM.block();
static Mono<String> readItem() {
return httpGet("https://mydatabase").map(v -> "item: " + v)
static Mono<String> httpGet(String url) {
return Mono.defer(() -> Mono.just("http response from " + url))
.transform(TracingOperator.withSpan("httpGet", builder -> builder
.setAttribute("url", url)
private static OpenTelemetry configureOpenTelemetry() {
return OpenTelemetrySdk.builder()
.addSpanProcessor(new SpanProcessor() {
public void onStart(io.opentelemetry.context.Context parentContext, ReadWriteSpan span) {
public boolean isStartRequired() {
return false;
public void onEnd(ReadableSpan span) {
public boolean isEndRequired() {
return true;
final class TracingOperator<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static final String REACTOR_OTEL_CONTEXT_KEY = "otel-context";
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName) {
return withSpan(spanName, Function.identity());
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName, Function<SpanBuilder, SpanBuilder> builder) {
return (Function<Publisher<T>, Publisher<T>>) Operators.lift(new TracingOperator<T>(spanName, builder));
private final Tracer tracer;
private final String spanName;
private final Function<SpanBuilder, SpanBuilder> builder;
private final io.opentelemetry.context.Context assemblyContext;
private TracingOperator(String spanName, Function<SpanBuilder, SpanBuilder> builder) {
this.tracer = GlobalOpenTelemetry.getTracer("reactor");
this.spanName = spanName;
this.builder = builder;
this.assemblyContext = io.opentelemetry.context.Context.current();
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> subscriber) {
Context subscriberContext = subscriber.currentContext();
io.opentelemetry.context.Context parentContext = figureOutParentContext(subscriberContext);
Span span = builder.apply(tracer.spanBuilder(spanName).setParent(parentContext)).startSpan();
io.opentelemetry.context.Context spanContext = parentContext.with(span);
subscriberContext = subscriberContext.put(REACTOR_OTEL_CONTEXT_KEY, spanContext);
return new TracingSubscriber<>(subscriber, subscriberContext, spanContext, span);
* We have three potential OTel contexts here and we need to work out which is the best one to use.
* <ul>
* <li>The current OTel context</li>
* <li>The OTel Context stored in the Reactor Context</li>
* <li>The OTel context captured during assembly</li>
* </ul>
private io.opentelemetry.context.Context figureOutParentContext(Context subscriberContext) {
if (subscriberContext.hasKey(REACTOR_OTEL_CONTEXT_KEY)) {
return subscriberContext.get(REACTOR_OTEL_CONTEXT_KEY);
return io.opentelemetry.context.Context.current();
final class TracingSubscriber<T> implements CoreSubscriber<T>, Subscription {
private static final AttributeKey<Boolean> REACTOR_CANCELED_ATTRIBUTE_KEY = AttributeKey.booleanKey("reactor.canceled");
private final CoreSubscriber<T> subscriber;
private final Context subscriberContext;
private final io.opentelemetry.context.Context otelContext;
private final Span span;
private Subscription subscription;
public TracingSubscriber(CoreSubscriber<T> subscriber, Context subscriberContext, io.opentelemetry.context.Context otelContext, Span span) {
this.subscriber = subscriber;
this.subscriberContext = subscriberContext;
this.otelContext = otelContext;
this.span = span;
public synchronized void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
try (Scope scope = this.otelContext.makeCurrent()) {
public void request(long n) {
if (subscription != null) {
try (Scope scope = this.otelContext.makeCurrent()) {
public void cancel() {
if (subscription != null) {
if (span != null) {
public void onNext(T t) {
try (Scope scope = otelContext.makeCurrent()) {
public void onError(Throwable t) {
if (span != null) {
try (Scope scope = otelContext.makeCurrent()) {
public void onComplete() {
if (span != null) {
try (Scope scope = otelContext.makeCurrent()) {
public Context currentContext() {
return this.subscriberContext;
