Skip to content

Instantly share code, notes, and snippets.

@lmolkova
Last active August 15, 2023 20:41
Show Gist options
  • Save lmolkova/e4215c0f44a49ef824983382762e6b92 to your computer and use it in GitHub Desktop.
Save lmolkova/e4215c0f44a49ef824983382762e6b92 to your computer and use it in GitHub Desktop.
EventHub and ServiceBus instrumentation

Short version

Context propagation protocol

application SDK MUST propagate trace parent part of the context (version, traceId, parentId and traceFlags) in the message in Diagnostic-Id application property following W3C trace-context format for traceparent encoding.

Also, propagate W3C trace context using W3C Trace Context Propagator from OpenTelemetry. It should populate traceparent and tracestate application properties. traceparent must match Diagnostic-Id value.

When extracting the context, first get W3C trace-context and then (if missing) read Diagnostic-Id.

Sending messages

Every message MUST carry trace context and trace context MUST be unique per message if tracing is enabled.

SDK MUST generate context for message when adding message to a batch OR when sending it.

SDK MUST create a span for each user send call add links to each message's context. SDK MUST set links before starting the span.

If message span is created in the same scope as send span, they will be siblings.

SDK MUST set attributes and other properties on spans if they are sampled in Send span:

  • span name: EventHubs.send - matches class/method pattern, but does not have to be precise
  • kind: client
  • az.namespace attribute : Microsoft.EventHub
  • messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
  • net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
  • messaging.system attribute should match eventhubs or servicebus
  • messaging.operation: publish
  • messaging.batch.message_count: in case more than one message is sent, should match number of messages in a batch
  • status

message span:

  • span name: EventHubs.message
  • kind: producer
  • az.namespace attribute : Microsoft.EventHub
  • messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
  • net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
  • messaging.system attribute should match eventhubs or servicebus

Receiving messages

SDK MUST create span for receive calls (when called by users and not processor client).

SDK should remember timestamp when receive method was started and then go ahead and receive messages.

Once messages are received, it should start a span, passing the recorded timestamp to OTel span builder. It should also extract context from all the received messages ans set such contexts as links on this receive span.

Each link MUST have enqueuedTime attribute with unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).

Receive span:

  • span name: EventHubs.receive - matches class/method pattern, but does not have to be precise
  • kind: client
  • az.namespace attribute : Microsoft.EventHub
  • messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
  • net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
  • messaging.system attribute should match eventhubs or servicebus
  • messaging.operation: receive
  • messaging.batch.message_count: in case more than one message is received, should match number of messages in a batch
  • status

Processing messages

If SDK supports event processing loop it MUST create span for message processing. If Receiver/Consumer client has callback mode, and callback is called when messages are received, it should follow processing semantics defined here.

If SDK executes user callback per single message, it MUST wrap this callback with a processing span and make it direct child of context in the message.

If SDK gives an array of messages to user callback, SDK MUST read trace context from each message, create a processing span and link all the messages context to the new span.

SDK MUST add links before starting span.

Each link MUST have enqueuedTime attribute with unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).

If implicit context is supported, SDK MUST set processing span on it.

If SDK calls user code in fire-and-forget way, SDK MUST still create a span for processing (and end it immediately after invoking user code).

SDK MUST set attributes on the processing span:

  • span name: EventHubs.process
  • kind: consumer
  • az.namespace attribute : Microsoft.EventHub
  • messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
  • net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
  • messaging.system attribute should match eventhubs or servicebus
  • messaging.operation: process
  • messaging.batch.message_count: in case more than one message is processed, should match number of messages in a batch
  • status

Other API calls that involve communication with service

SDK MUST instrument calls to the service that settle the message. If such calls happen within the scope of processing span or user span, they MUST become children of this span.

If specific message is being settled, settle span should be linked to that message trace context.

Settle span:

  • span name: ServiceBus.complete, ServiceBus.abandon, ServiceBus.deadLetter, etc. (strict match is not required)
  • kind: client
  • az.namespace attribute : Microsoft.ServiceBus
  • messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
  • net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
  • messaging.system attribute should match eventhubs or servicebus
  • messaging.operation: settle

Full version

Context propagation protocol

SDK MUST propagate trace parent part of the context (version, traceId, parentId and traceFlags) in the message in Diagnostic-Id property following W3C trace-context format for traceparent encoding.

SDK MUST ignore tracestate at this point.

Motivation

EventHubs messages are propagated throught EventHubs service as is. The trace context MUST be stamped on the message when instrumentation is enabled regardless of message span sampling decision.

Propagation format through EventHubs is proposed here. Note this is proposal against proposal.

Following this proposal has following drawbacks and risks

  • If proposal changes, we need to take care of backward compatibility
  • It requires additional work in EventHubs SDK to support byte[] payloads
  • It required additional work in OpenTelemetry to support different encodings for traceparent and tracestate (byte[] and string).

There is also existing header backed into the EventHubs SDK: Diagnostic-Id.

So we have a couple of options:

  1. Keep using Diagnostic-Id to propagate W3C context for now. switch to W3C AMQP proposal when it is more mature.
    • Propagate 00-traceid-spanid-0<?> as string in Diagnostic-Id
    • On receiving side, in .NET pass Diagnostic-Id to Activity - it will figure out the format. In other languages - ignore ids in old format
    • Do not support tracestate propagation
  2. Implement new AMPQ proposal
    • Propagate Traceparent as binary
    • Propagate Tracestate as string
    • Implement backward-compatibitlity for Diagnostic-Id in .NET

Pros (Option 1):

  • do not introduce a new thing (header) that might change in future.
  • Save us in future from another layer of backward-compatibility
  • No need to start supporting binary payloads
  • No need to start supporting different encodings for traceparent and tracestate in OpenTelemetry

Cons:

  • Tracestate is not propagated
  • Different client libraries don't share the same protocol (kafka to EH) - which is reasonable in absence of standard and there are no other official insturmentations.

So we will pass trace-parent through existing Diagnostic-Id and wait for the standard to mature. Azure Monitor will keep pushing for AMQP protocol standartization.

Tracing calls to EventHub service

It could be useful to trace send/receive calls to EventHub service via EventHubs diagnostics logs. AQMP trace-context proposal does not gives any guidance on this. This spec also does not cover such scenarios at this point.

Batching and OpenTelemetry links

EventHubs SDKs (ServiceBus, Kafka and other messaging SDKs) support sending, receiving and processing batches of messages. Tracing of batched operation require new kind of relationship between spans that allows to associate one span with many others.

Let's say producer created several message and injected a unique tracing context into each message. Now consumer received the batch. Each message has it's own context. Let's imagine producer calculates average over something in the batch and publishes result to the storage. For this, consumer will create a new span for the processing part, do aggregation and call storage (which will lead to other spans being created). Consumer will use OpenTelemetry mechanism called link to represent relation to messages.

Link is basically a span context (traceId, spanId and flags that were propagated over the wire). Processing span will have a list of such links so UX will be able to show the relationship between traces.

Send messages

EventHubs SDK API allows sending single messages or batches.

Every message MUST carry trace context and trace context MUST be unique per message. It must be possible to trace messages separately. Creating new context requires new span to be created and reported as each new context should be logged to maintain causation between spans.

SDK MUST generate context for message when creating message OR when sending it

  1. If SDK generates context when message is constructed, messages become related to the scope where they were created and if messages are sent in the different scope (background thread) they still carry proper context.
  2. There is an assumption that users create and send messages in the same scope. In this case, SDK may stamp context when message is sent (if it does not have context).
  3. If SDK provides helpers to create batches (as EventDataBatch such helpers can take care of context creating and stamping. Users can also manually stamp context if needed during creation.

So SDK may use strategy p1 (stamp context in message constructor) or combination of p2 and p3.

SDK MUST create a span for each send call from user and add links to each message's context

Context may be created for the message before send happens, it is possible to extract the context from the message to create a link, but not efficient. As an optimization, SDK should store the message span context instance somewhere (as EventData class internal property or weak reference) to simplify linking and make it efficient.

SDK MUST set links before starting the send span. This enables sampling based on links sampling decisions.

SDK MUST set attributes and other properties on spans if they are sampled in Send span:

  • span name: Azure.EventHubs.send
  • kind: client
  • az.namespace attribute : Microsoft.EventHub
  • message_bus.destination attribute: EventHub entity name
  • peer.address attribute: Fully qualified EventHub service endpoint such as <name>.servicebus.windows.net
  • status

message span:

  • span name: Azure.EventHubs.message
  • kind: producer
  • az.namespace attribute : Microsoft.EventHub
  • message_bus.destination attribute: EventHub entity name
  • peer.address attribute: Fully qualified EventHub service endpoint such as <name>.servicebus.windows.net

If message span is created in the same scope as send span, they MUST be siblings.

OpenTelemetry example

public static void sendSync(Iterable<EventData> messages, Context context) {
  // check if we have context passed explicitly or implicitly
  Span parentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());

  Span.Builder builder = tracer
      .spanBuilder("Azure.EventHubs.send")
      .setParent(parentSpan)
      .setSpanKind(Kind.CLIENT); 

  // add links and generate context
  for(EventData msg : messages){
    if (msg.SpanContext != null) {
      // if message has context, link it to the span
      // if may have context from previous retry or it was set when creaing EventData or EventDataBatch
      builder.addLink((SpanContext)msg.SpanContext);
    } else {
      // otherwise, create a context. No need to link it because messages are created in the same scope as
      // Send happens and they are children of the same span
      Span msgSpan = tracer
          .spanBuilder("Azure.EventHubs.message")
	  .setSpanKind(Kind.PRODUCER); // Producer not available on OpenCensus, set internal for census
          .setParent(parentSpan)
          .startSpan();

      String traceparent = getDiagnosticId(msgSpan.getContext());
      msg.getProperties().putIfAbsent("Diagnostic-Id", traceparent);

      msgSpan.end();

      msg.SpanContext = msgSpan.getContext();
    }
  }

  Span span = builder.startSpan();
  if (span.isRecordingEvents()) {
    span.setAttribute("az.namespace", `"Microsoft.EventHub");
    span.setAttribute("message_bus.destination", this.entityPath);
    span.setAttribute("peer.address", this.endpoint);
  }

  try {
    // do send ...
    ehClient.sendSyncWithRetries(messages);
  } catch (EventHubException ex){
    span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
  }
  span.end();
}

private static final String getDiagnosticId(SpanContext spanContext) {
  char[] chars = new char[55];
  chars[0] = '0';
  chars[1] = '0';
  chars[2] = '-';
  spanContext.getTraceId().copyLowerBase16To(chars, 3);
  chars[35] = '-';
  spanContext.getSpanId().copyLowerBase16To(chars, 36);
  chars[52] = '-';
  spanContext.getTraceOptions().copyLowerBase16To(chars, 53);
  return new String(chars);
}

C# example

.NET has primitives to work with distributed tracing - System.Diagnostics.Activity and System.Diagnostics.DiagnosticSource. They allow to decouple library instrumentation from the listener without adding external dependencies.

Example below demonstrates Activity usage (and intentionally skips DiagnosticSource subscription/notification details - check out this article for more info)

In this example links are sent as DiagnosticSource payload (as Activity does not support them directly).

public async Task SendMessagesAsync(IEnumerable<EventData> messages)
{
    // check if DiagnosticSource is enabled 
    bool isInstrumentationEnabled = diagnosticSource.IsEnabled() && diagnosticSource.IsEnabled("send");
    if (!isInstrumentationEnabled)
    {
        // do send ...
        return;
    }

    // and if it is, do the instrumentation:
    var sendActivity = new Activity("send");
    var eventDatas = messages as EventData[] ?? messages.ToArray();

    List<Activity> links = null;
    foreach (var msg in eventDatas)
    {
        // if message happen to have context already
        if (msg.SpanContext != null)
        {
            if (links == null) links = new List<Activity>();
            links.Add(msg.SpanContext);
        }
        // it might be also that user added context manually,
        else
        {
            var msgActivity = new Activity("message");
            diagnosticSource.StartActivity(msgActivity, null);

            // WARNING: byte[] properties are not supported yet by EventHub .NET SDK
             msg.Properties["Diagnostic-Id"] = msgActivity.Id;
             diagnosticSource.StopActivity(msgActivity, null);

            msg.SpanContext = msgActivity;
        }
    }

    diagnosticSource.StartActivity(sendActivity, null);
    if (sendActivity.Recorded)
    {
        sendActivity.AddTag("az.namespace", "Microsoft.EventHub");
        sendActivity.AddTag("message_bus.destination", this.entityPath);
        sendActivity.AddTag("peer.address", this.endpoint);
    }

    // do send 
    // ...

    diagnosticSource.StopActivity(sendActivity, new {Links = links});
}

private static readonly DiagnosticListener diagnosticSource = new DiagnosticListener("Azure.EventHubs");

Process messages

Message processing could be handled by user code or SDK. EventHubs SDKs provide EventHub processor host in (at least) some languages and the processor host gives user messages one-by-one (Java, track2) or list of the messages that likely carry different tracing contexts.

If SDK supports event processing loop and passes a single message to user, it MUST create span for message processing and make it a child of context from message (or orphan if there is no context).

If SDK supports event processing loop and passes array of messages to user, it MUST create span for message processing and link context from messages. Before processing events, SDK must read trace context from the messages, create a processing span and link all the messages context to the new span. SDK MUST add links before starting span.

If SDK does not awaits for user code to complete, it still MUST create processing span, attribute it with parent or links and MUST end it after user code is invoked

SDK MUST set attributes on each link

  • enqueuedTime attribute: unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).

SDK MUST set attributes on the processing span

  • span name: Azure.EventHubs.process
  • message_bus.destination attribute: EventHub entity name
  • peer.address attribute: Fully qualified EventHub service endpoint such as sb://<name>.servicebus.windows.net/
  • kind: consumer
  • status

If users are interested in individual message tracing, they MUST create spans manually as children of each particular message. On the SDK level we do not know how users treat messages: aggregate or process individually. In case of aggregation, multiple traces from different messages are merging into a new trace. In case of individual processing (e.g. store content of each message in Cosmos), it could make sense to continue each transaction individually.

OpenTelemetry example

public void onEvents(PartitionContext partitionContext, Iterable<EventData> messages, Context context) {
  // check if we have context passed explicitly or implicitly
  // parent span is most likely null and it's ok
  Span parentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());

  Span.Builder builder = tracer
      .spanBuilder("Azure.EventHubs.process")
      .setParent(parentSpan)
      .setSpanKind(Kind.CONSUMER); // use SERVER with OpenCensus, CONSUMER with opentelemetry

  for(EventData msg : messages){
    // TODO: optimization: If receive is instrumented, we might have extracted context then
    // and we should have cached it there
    SpanContext msgContext = AmqpPropagationFormat.extractContext(msg);
    // if message has context, link it to the span
    if (msgContext.isValid()) {
      builder.addLink(msgContext);
    }
  }

  Span span = builder.startSpan();
  Scope scope = tracer.withSpan(span);
  if (span.isRecordingEvents()) {
    span.setAttribute("az.namespace", "Microsoft.EventHub");
    span.setAttribute("message_bus.destination", this.entityPath);
    span.setAttribute("peer.address", this.endpoint);
  }

  try {
    // call users impl of IEventProcessor.onEvents
  } catch (Exception ex) {
    span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
  } finally {
    scope.close();
  }

  span.end();
}

Process single message

public void onEvent(PartitionContext context, EventData message) {
  Span.Builder builder = tracer
      .spanBuilder("Azure.EventHubs.process")
      .setSpanKind(Kind.COMSUMER);

  SpanContext msgContext = AmqpPropagationFormat.extractContext(message);
  // if message has context, make it parent of the span
  if (msgContext.isValid()) {
    builder.setParent(msgContext);
  }

  Span span = builder.startSpan();
  Scope scope = tracer.withSpan(span);
  if (span.isRecordingEvents()) {
    span.setAttribute("az.namespace", "Microsoft.EventHub");
    span.setAttribute("message_bus.destination", this.entityPath);
    span.setAttribute("peer.address", this.endpoint);
  }

  try {
    // call user's callback
  } catch (Exception ex) {
    span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
  } finally {
    scope.close();
  }

  span.end();
}

C# example

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    // check if DiagnosticSource is enabled 
    bool isInstrumentationEnabled = diagnosticSource.IsEnabled() && diagnosticSource.IsEnabled("process");
    if (!isInstrumentationEnabled)
    {
        // do the processing...
        // ... 

        return;
    }

    var activity = new Activity("process");

    List<Activity> links = null;
    foreach (var msg in messages)
    {
        if (msg.SpanContext != null)
        {
            if (links == null) links = new List<Activity>();
            links.Add(msg.SpanContext);
        }
    }

    diagnosticSource.StartActivity(activity, null);
    if (activity.Recorded)
    {
        activity.AddTag("az.namespace", "Microsoft.EventHub");
        activity.AddTag("message_bus.destination", this.entityPath);
        activity.AddTag("peer.address", this.endpoint);
    }

    // do the processing...
    // ... 

    diagnosticSource.StopActivity(activity, new { Links = links });
}

Receive messages

EventHubs SDK API allows receving batches of messages.

SDK MAY instrument receive call EventHubs prefetches messages and receiving on the heavily loaded service is likely a local operation. If there are no messages for a long time, receiving will result in timeout - which should not be treated as failure. Sampling decision will be made on receive span before it knows which messages it received, on the high-scale service with agressive sampling chances that receive call is sampled in could be low. Presumably majority of users don't call receive directly and use event processor host instead.

Considering all this, instrumenting receive call brings low value.

If SDK decides to instrument it, SDK MUST create a span for receive call and, if this span is sampled in, SDK MUST add links to each received message's context. Linking should be skipped if there is no context in the message. Linking may still happen if receive span is sampled out - it will happen anyway when message batch is processed (and SDK may try to optimize it further). Parsed span context could be stored on the EventData as internal property for optimization purposes (we'll need it again for message processing).

SDK MUST set attributes and other properties on spans if they are sampled in

  • span name: Azure.EventHubs.receive
  • kind: client
  • az.namespace attribute : Microsoft.EventHub
  • message_bus.destination attribute: EventHubs entity name
  • peer.address attribute: Fully qualified EventHubs service endpoint such as <name>.servicebus.windows.net
  • status

OpenTelemetry example

public static Iterable<EventData> receiveSync(int count, Context context) {
  // check if we have context passed explicitly or implicitly
  // parent Span is most likely null and it's ok
  Span parentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());

  Span span = tracer.spanBuilder("Azure.EventHubs.receive")
      .setSpanKind(Kind.CLIENT)
      .setParent(parentSpan)
      .startSpan();

  Iterable<EventData> messages = null;
  try {
    // do receive ...
    messages = ehReceiver.receiveSync(count);
  } catch (EventHubException ex) {
    span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
    return messages;
  }

  for (EventData msg : messages) {
    SpanContext msgContext = AmqpPropagationFormat.extractContext(msg);
    if (msgContext.isValid() && span.isRecordingEvents()) {
      span.addLink(msgContext);
    }

    // we will need context again when processing event - let's cache it
    msg.SpanContext = msgContext;
  }

  if (span.isRecordingEvents()) {
    span.setAttribute("az.namespace", "Microsoft.EventHub");
    span.setAttribute("message_bus.destination", this.entityPath);
    span.setAttribute("peer.address", this.endpoint);
  }

  span.end();
  return messages;
}

C# example

public async Task<IEnumerable<EventData>> ReceiveAsync(int count)
{
    // check if DiagnosticSource is enabled 
    bool isInstrumentationEnabled = diagnosticSource.IsEnabled() && diagnosticSource.IsEnabled("receive");
    if (!isInstrumentationEnabled)
    {
        return await this.ReceiveInternalAsync(count);
    }

    var activity = new Activity("receive");
    diagnosticSource.StartActivity(activity, null);
    IEnumerable<EventData> receivedMessages = await this.ReceiveInternalAsync(count);

    List<Activity> links = null;
    foreach (var msg in receivedMessages)
    {
        if (msg.Properties.TryGetValue("Diagnostic-Id", out var traceparent))
        {
            if (links == null) links = new List<Activity>();

            var ctx = new Activity("message").SetParentId((string)traceparent);
            links.Add(ctx);
            msg.SpanContext = ctx;
        }
    }

    if (activity.Recorded)
    {
        activity.AddTag("az.namespace", "Microsoft.EventHub");
        activity.AddTag("message_bus.destination", this.entityPath);
        activity.AddTag("peer.address", this.endpoint);
    }

    diagnosticSource.StopActivity(activity, new {Links = links});

    return receivedMessages;
}

Other operations

SDK MUST instrument calls to the service that update checkpoint or remote state of the message.

If such calls happen within the scope of processing span or user span, they MUST become children of this span.

All service calls should be instrumented and MUST carry following info

  • span name: Azure.EventHubs.<operation-name>
  • kind: client
  • az.namespace attribute : Microsoft.EventHub
  • message_bus.destination attribute: EventHubs entity name
  • peer.address attribute: Fully qualified EventHubs service endpoint such as <name>.servicebus.windows.net
  • status

It may be useful to add other attributes such as partition or offset.

Extract/Inject Helpers

Advanced scenarios may involve creating context manually and injecting it into the message: e.g. service is a proxy that transforms one kind of message into Event Hub event and sends it.

More typical scenario is when users what to have more control over message processing e.g. when they process messages individually and want to extract context and start a new span from it.

SDK MUST provide helper method to extract and context and MAY provide helper method to inject context. While extraction and injections could be handled by OpenTelemetry propagation formats and inject/extract methods, it makes user write non-trival code and know exactly which format is used for trace context by EventHubs SDK (and we now use something very custom).

Such methods may be shipped in OpenTelemetry plugin packages or (in case of .NET) could be exposed on the SDK layer.

Proposed public methods to extract context from the message

Java

package com.azure.tracing.opentelemetry;

public class AmqpPropagationFormat
{
  public static SpanContext extractContext(EventData message)
  {
    String diagnosticId = message.getProperties().get("Diagnostic-Id").toString();

    return fromDiagnosticId(diagnosticId);
  }

  static final SpanContext fromDiagnosticId(String diagnosticId) {
    if (diagnosticId == null || diagnosticId.length() < 55 || !diagnosticId.startsWith("00")) {
      return SpanContext.create(TraceId.getInvalid(), SpanId.getInvalid(), TraceOptions.getDefault(), Tracestate
          .getDefault());
    }
    return SpanContext.create(
        TraceId.fromLowerBase16(diagnosticId, 3),
        SpanId.fromLowerBase16(diagnosticId, 36),
        TraceOptions.fromLowerBase16(diagnosticId, 53),
        Tracestate.getDefault());
  }
}

Usage example: User who do not use Event Processor Host should be able to write code similar to below one to process messages one-by-one or do batching depending on their scenario.

public void manualProcessingLoop() throws EventHubException {
  Iterable<EventData> messages = ehReceiver.receiveSync(5);

  for (EventData msg : messages) {
  Span span = tracer
    .spanBuilder("process message")
    .setParent(AmqpPropagationFormat.extractContext(msg))
    .startSpan();

 try (Scope _ = tracer.withSpan(span)) {

  // process message

    span.setStatus(Status.OK);
  } catch (Exception ex) {
    span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
  }

  span.end();
  }
}

C#

In .NET same coudld be done with extenstions methods how it's used to be in track 1

public static class AmqpPropagationFormat
{
  public static Activity ExtractActivity(EventData message)
  {
    var activity = new Activity("message");
    if (message.Properties.TryGetValue("Diagnostic-Id", out var diagnosticId))
    {
	// activity will validate it
	activity.SetParentId(diagnosticId.ToString());
    }

    return activity;
  }
}

Usage example:

static async Task ManualProcessingLoop(EventHubClient eventHubClient, string partitionId)
{
    var tracer = Tracing.Tracer;
    var receiver = eventHubClient.CreateReceiver("$Default", partitionId, DateTime.UtcNow);

    while (true)
    {
	IEnumerable<EventData> messages = await receiver.ReceiveAsync(5);

	foreach (EventData message in messages)
	{
	    var span = tracer.SpanBuilder("process message")
		.SetParent(AmqpPropagationFormat.ExtractActivity(message))
		.StartSpan();

	    // process messages
	    span.End();
	}
    }
}

Mapping to Azure Monitor (Application Insights) telemetry

If exported span has az.namespace attribute with Microsoft.EventHub value

  1. Spans with CLIENT kind translate into DependencyTelemetery

    • type : Microsoft.EventHub
    • target : attributes["net.peer.name"]/attributes["messaging.destination.name"]
    • links (see below)
  2. PRODUCER spans: DependencyTelemetery

    • type : Queue Message | Microsoft.EventHub
    • target : attributes["net.peer.name"]/attributes["messaging.destination.name"]
  3. CONSUMER spans: RequestTelemetry

    • source : attributes["net.peer.name"]/attributes["messaging.destination.name"]
    • customMeasurements["timeSinceEnqueued"] - mean time (in ms) among linked messages: go through the span.Links - each of them has enqueuedTime attribute. Calculate the difference between span start time (UTC) and this timestamp. Put mean value in custom measurements. Value could be negative, if it is, set it to 0.
    • links (see below)

All spans (not EventHub-specific):

  • if links are present, put them into custom properties _MS.links in json blob [{"operation_Id":link0.trace_id, "id":link0.span_id}]e.g. [{"operation_Id":"5eca8b153632494ba00f619d6877b134","id":"d4c1279b6e7b7c47"}, {"operation_Id":"ff28988d0776b44f9ca93352da126047","id":"bf4fa4855d161141"}]. Ignore tracestate, attributes and options
  • ResultCode : status.CanonicalCode
  • Success : status.isOk
  • name : span name
  • duration : span duration
  • INTERNAL spans if az.namespace attribute is present, translate into DependencyTelemetery with type = InProc | attributes["az.namespace"]
  • INTERNAL spans if az.namespace not present - translate into DependencyTelemetery with type = InProc
@johanste
Copy link

johanste commented Aug 1, 2019

In the first example:

public static void sendSync(Iterable<EventData> messages) {
  Span.Builder builder = tracer
      .spanBuilder("Azure.EventHubs.send")
      .setSpanKind(Kind.PRODUCER);

  for(EventData msg : messages){
    if (msg.SpanContext != null) {
      // if message has context, link it to the span
      builder.addLink((SpanContext)msg.SpanContext); // <-- doesn't this add another link to what is likely an ancestor to the current span?
      ....

I would assume that there is a fairly high probability that the message was created in the context of the current span. E.g. incoming HTTP request to web server, span is dehydrated or created and set as current, create an event in current span, call send, create child span, add link to messages span). Is this a problem?

EDIT: never mind - the span associated with the message would be a sibling (or cousin or something), not a direct ancestor.

@richardpark-msft
Copy link

richardpark-msft commented Aug 15, 2023

From an outside discussion, when you want to create child spans of the operation span you still want to have a minimal set of attributes:

  • messaging.destination.name
  • server.address

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment