-
The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.
-
Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.
-
Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Grid types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.
- Client hierarchy
- Open questions
- Usage examples
- Implicit interoperability questions and concerns
- References and resources
-
EventGridTopicClient: The top-level client for interacting with a given topic of an Event Grid namespace. It represents a single AMQP connection shared by any sub-clients spawned from it.
-
EventGridQueueReader: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with queue semantics.
-
EventGridQueueProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with queue semantics.
-
EventGridStreamReader: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with stream semantics.
-
EventGridStreamProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with stream semantics.
-
Can any service operation be performed at the namespace level, or is every operation bound to a specific topic?
-
What are the intended use cases for queue versus stream semantics? These examples assume that queue readers are interested in deterministic processing with explicit control and stream readers are most interested in throughput.
-
Will checkpoints for streams be external like Event Hubs or broker-owned like Kafka? These examples assume external, despite strong customer feedback for broker-owned checkpoints.
The EventGridTopicClient
supports the standard set of constructors for token credentials, and key credentials. Because it maintains a stateful connection, it implements IAsyncDisposable
and is expected to be disposed when no longer in use.
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new AzureKeyCredential("<< KEY VALUE >>");
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();
var options = new EventGridTopicClientOptions
{
TransportType = EventGridTransportType.AmqpWebsockets,
Identifier = "Squire-Node-11",
ConnectionIdleTimeout = TimeSpan.FromSeconds(90)
};
await using var client = new EventGridTopicClient(
endpoint,
topciName,
credential,
options);
All events are published in the CloudEvent
format, with access to the full AMQP message available for advanced scenarios. Publishing is done using the EventGridTopicClient
and follows the existing patterns established for the Azure messaging services.
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
var cloudEvent = new CloudEvent(
source: "some-source",
type: "com.microsoft.squire-example",
data: BinaryDat.FromBytes(anew byte[] { 0x1, 0x2 }),
format: CloudEventDataFormat.Binary);
await client.PublishCloudEventAsync(cloudEvent);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await client.PublishCloudEventsAsync(new[]
{
new CloudEvent("some-source", "some-type", "data!"),
new CloudEvent("some-source", "other-type", "moardata!"))
});
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
using var batch = await client.CreateBatchAsync();
while (TryGetNextCloudEvent(..., out var currentEvent))
{
if (!batch.TryAdd(currentEvent))
{
// If there are events in the batch, then the batch is full. Publish
// it and create a new batch for the event that wouldn't fit.
if (batch.Count > 0)
{
await client.PublishCloudEventsAsync(batch);
batch.Dispose();
batch = await client.CreateBatchAsync();
// There are no events in the batch, if the current event
// does not fit, then it is too large to ever be published.
if (!batch.TryAdd(currentEvent))
{
throw new Exception("There was an event too large to fit into any batch.");
}
}
}
}
// If there are any events in the batch, publish a partial
// batch, as we have no more events.
if (batch.Count > 0)
{
await client.PublishCloudEventsAsync(batch);
}
using var cancellationToken = GetCancellationToken(...);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateQueueReader(subscriptionName)
while (true)
{
var cloudEvent = reader.ReadCloudEventAsync(cancellationToken);
if (cloudEvent is not null)
{
await ProcessEventAsync(cloudEvent, cancellationToken, ...);
await reader.AcknowledgeCloudEventAsync(cloudEvent);
}
cancellationToken.ThrowIfCancellationRequested();
}
using var cancellationToken = GetCancellationToken(...);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateQueueReader(subscriptionName);
while (true)
{
var eventBatch = reader.ReadCloudEventsAsync(maxEvents: 10, cancellationToken);\
await ProcessAndAcknowledgeBatchAsync(eventBatch, cancellationToken, ...);
cancellationToken.ThrowIfCancellationRequested();
}
using var cancellationToken = GetCancellationToken(...);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();
var options = new EventGridQueueProcessorOptions
{
AutoAcknowledgeMessages = false,
MaxConcurrentCalls = 2
};
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var processor = client.CreateQueueProcessor(subscriptionName, options);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;
async Task ProcessEventHandler(ProcessCloudEventQueueArgs args)
{
await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
await args.AcknowledgeCloudEventAsync(args.CloudEvent);
}
Task ErrorHandler(ProcessErrorEventArgs args) =>
LogProcessorError(
args.FullyQualifiedNamespace,
args.TopicName,
args.SubscriptionName,
args.ErrorSource,
args.Exception.ToString());
await processor.StartProcessingAsync();
try
{
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
// Expected
}
finally
{
await processor.StopProcessingAsync();
}
At the time of writing, the Event Grid service design for streams has not been completed. The proposed client surface is based on Event Hubs patterns and expected Event Grid stream operations.
using var cancellationToken = GetCancellationToken(...);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateStreamReader(subscriptionName, EventStreamPosition.Earliest);
while (true)
{
var eventBatch = reader.ReadCloudEventsAsync(
maxBatchSize: 100,
maxWaitTime: TimeSpan.FromSeconds(15),
cancellationToken);
foreach (var cloudEvent in eventBatch)
{
await ProcessEventAsync(ecloudEvent, cancellationToken, ...);
}
cancellationToken.ThrowIfCancellationRequested();
}
using var cancellationToken = GetCancellationToken(...);
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var storageClient = new BlobContainerClient(
storageConnectionString,
blobContainerName);
var checkpointStore = new BlobCheckpointStore(storageClient);
var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();
var options = new EventGridQueueProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 300,
MaxWaitTime = TimeSpan.FromSeconds(30),
MaxConcurrentBatches = 5
};
await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var processor = client.CreateStreamProcessor(
checkpointStore,
subscriptionName,
EventStreamPosition.FromSequenceNumber(1234),
options);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;
async Task ProcessEventHandler(ProcessCloudEventStreamArgs args)
{
CloudEvent lastEvent;
foreach (var cloudEvent in args.CloudEvents)
{
await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
lastEvent = args.CloudEvent;
}
await args.UpdateCheckpoint(lastEvent);
}
Task ErrorHandler(ProcessErrorEventArgs args) =>
LogProcessorError(
args.FullyQualifiedNamespace,
args.TopicName,
args.SubscriptionName,
args.ErrorSource,
args.Exception.ToString());
await processor.StartProcessingAsync();
try
{
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
// Expected
}
finally
{
await processor.StopProcessingAsync();
}
-
Each service uses different auth scopes. How would the client understand what scope is needed if authorization fails when pointed at Event Hubs or Service Bus?
-
Event Grid uses uses different token types for shared keys and SAS. Will they be supported by Service Bus and Event Hubs?
-
The entity paths used by Event Grid do not match those used by Service Bus. I expect the same will be the case for Event Hubs.
-
Given that partitions in Event Grid are an implicit concept and they are both explicit for Event Hubs and very important, I do not see a way that Event Grid stream readers could work against Event Hubs without new service features.
-
Will Event Grid be using the Service Bus names for management link-based operations and the same structures for request/response? (ref)
-
Will Event Grid be supporting the full AMQP message? If so, will that include the vendor-specific application property types for TimeSpan, URI, and DateTimeOffset? (ref)
-
Will Event Grid be supporting the WebSocket prefix used by Service Bus and Event Hubs? (ref)
-
Will Event Grid be supporting batch-based message settlements? If so, how would that translate to Service Bus?