// ---------------------Send Operations----------------------------------
/* Send events without specifying partition */
private static void send(List<EventData> eventDataList) {
EventHubProducerClient producerClient = new EventHubClientBuilder()
.connectionString("")
.buildProducerClient();
producerClient.send(eventDataList);
}
/* Send events using a partition key - not a specific partition id */
private static void sendWithPartitionKey(List<EventData> eventDataList) {
EventHubProducerClient producerClient = new EventHubClientBuilder()
.connectionString("")
.buildProducerClient();
SendOptions sendOptions = new SendOptions().setPartitionKey("partitionKey");
producerClient.send(eventDataList, sendOptions);
}
/* Send events to a specific partition id */
private static void sendToPartition(List<EventData> eventDataList) {
EventHubProducerClient producerClient = new EventHubClientBuilder()
.connectionString("")
.buildProducerClient();
SendOptions sendOptions = new SendOptions().setPartitionId("1");
producerClient.send(eventDataList, sendOptions);
}
/*
Send events to same partition, but dont re-use AMQP link, but use the same connection
This mimics Track 1's creation of multiple producers from the same client
Need to check with service if this is a case we should care about.
Regardless, this is an advanced case, can be an add on, need not be in GA
*/
private static void sendToPartitionSeparateLinks(List<EventData> eventDataList) {
EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
EventHubClientBuilder clientBuilder = new EventHubClientBuilder().connection(eventHubConnection);
EventHubProducerClient producerClient1 = clientBuilder.buildProducerClient();
EventHubProducerClient producerClient2 = clientBuilder.buildProducerClient();
SendOptions sendOptions1 = new SendOptions().setPartitionId("1");
SendOptions sendOptions2 = new SendOptions().setPartitionId("1");
producerClient1.send(eventDataList, sendOptions1);
producerClient2.send(eventDataList, sendOptions2);
}
// ---------------------Receive Operations----------------------------------
/* Receive events from all partitions with no load balancing or checkpointing*/
private static void receiveAll() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.processEvents((ctx, eventData) -> {
// process event
ctx.getPartitionId();
return Mono.empty();
})
.buildConsumerClient();
consumerClient.start();
consumerClient.stop();
}
/* Receive events from all partitions with load balancing and checkpointing*/
private static void recieveAll(PartitionManager partitionManager) {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionManager(partitionManager)
.processEvents((ctx, eventData) -> {
// process event
ctx.getPartitionId();
return ctx.updateCheckpoint(eventData);
})
.buildConsumerClient();
consumerClient.start();
consumerClient.stop();
}
/* Receive events from specific partitions */
private static void receiveFromPartitions() {
EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
EventHubPartitionConsumer eventHubPartitionConsumer1 = new EventHubPartitionConsumer(eventHubConnection,
consumerGroup, partitionId1, eventPosition);
EventHubPartitionConsumer eventHubPartitionConsumer2 = new EventHubPartitionConsumer(eventHubConnection,
consumerGroup, partitionId2, eventPosition);
eventHubPartitionConsumer1.receive().subscribe(eventData -> processEvents1(eventData));
eventHubPartitionConsumer2.receive().subscribe(eventData -> processEvents2(eventData));
}
Created
October 16, 2019 21:36
-
-
Save srnagar/fcbd428f15c010ee0e27fba16d82d777 to your computer and use it in GitHub Desktop.
Event Hubs Java API review
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment