Start from scratch - no blobs for either ownership or checkpoint
Ownership and checkpoint blobs for some partitions but not all
Ownership blobs for all partitions but no checkpoint blobs
Ownership blobs without ownerid in metadata and checkpoint blobs without sequence number and offset in metadata
Checkpoint blobs with only sequence number (no offset)
Checkpoint blobs with only offset (no sequence number)
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventData ;
import com .azure .messaging .eventhubs .EventHubClientBuilder ;
import com .azure .messaging .eventhubs .EventHubProducerAsyncClient ;
import java .util .concurrent .TimeUnit ;
public class EventProducer {
private static final String CONNECTION_STRING = "" ;
public static void main (String args []) throws InterruptedException {
EventHubProducerAsyncClient producer = new EventHubClientBuilder ()
.connectionString (CONNECTION_STRING )
.buildAsyncProducerClient ();
System .out .println ("Creating batch and sending events" );
producer .createBatch ()
.flatMap (batch -> {
batch .tryAdd (new EventData ("Test event" ));
return producer .send (batch );
}).repeat (() -> true )
.subscribe (unused -> {
System .out .println ("Message sent" );
},
error -> System .out .println ("Error sending event " + error .getMessage ()),
() -> System .out .println ("Complete sending" ));
TimeUnit .DAYS .sleep (1 );
producer .close ();
System .out .println ("Closing application" );
}
}
Event Processor with basic error handling and event processing
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventProcessorClient ;
import com .azure .messaging .eventhubs .EventProcessorClientBuilder ;
import com .azure .storage .blob .BlobContainerAsyncClient ;
import com .azure .storage .blob .BlobContainerClientBuilder ;
import java .util .concurrent .TimeUnit ;
public class EventProcessor {
private static final String STORAGE_CONNECTION_STRING ="" ;
private static final String SAS_TOKEN = "" ;
private static final String CONTAINER_NAME = "" ;
private static final String EH_CONNECTION_STRING = "" ;
private static final String CONSUMER_GROUP = "" ;
public static void main (String [] args ) throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder ()
.connectionString (STORAGE_CONNECTION_STRING )
.containerName (CONTAINER_NAME )
.sasToken (SAS_TOKEN )
.buildAsyncClient ();
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore (blobContainerAsyncClient );
EventProcessorClient processor = new EventProcessorClientBuilder ()
.checkpointStore (blobCheckpointStore )
.connectionString (EH_CONNECTION_STRING )
.consumerGroup (CONSUMER_GROUP )
.processEvent (eventContext -> {
if (eventContext .getEventData ().getSequenceNumber () % 100 == 0 ) {
System .out .println (
"Updating checkpoint for partition " + eventContext .getPartitionContext ().getPartitionId ()
+ " with seq num " + eventContext .getEventData ().getSequenceNumber ());
eventContext .updateCheckpoint ();
}
})
.processError (errorContext -> {
System .out .println (
"Error " + errorContext .getPartitionContext ().getPartitionId () + " " + errorContext .getThrowable ()
.getMessage ());
})
.buildEventProcessorClient ();
processor .start ();
TimeUnit .DAYS .sleep (1 );
processor .stop ();
}
}
Event Processor with initialization handler error
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventProcessorClient ;
import com .azure .messaging .eventhubs .EventProcessorClientBuilder ;
import com .azure .storage .blob .BlobContainerAsyncClient ;
import com .azure .storage .blob .BlobContainerClientBuilder ;
import java .util .concurrent .TimeUnit ;
public class EventProcessor {
private static final String STORAGE_CONNECTION_STRING ="" ;
private static final String SAS_TOKEN = "" ;
private static final String CONTAINER_NAME = "" ;
private static final String EH_CONNECTION_STRING = "" ;
private static final String CONSUMER_GROUP = "" ;
public static void main (String [] args ) throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder ()
.connectionString (STORAGE_CONNECTION_STRING )
.containerName (CONTAINER_NAME )
.sasToken (SAS_TOKEN )
.buildAsyncClient ();
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore (blobContainerAsyncClient );
EventProcessorClient processor = new EventProcessorClientBuilder ()
.checkpointStore (blobCheckpointStore )
.connectionString (EH_CONNECTION_STRING )
.consumerGroup (CONSUMER_GROUP )
.processPartitionInitialization (initContext -> {throw new IllegalStateException ("init error" );})
.processEvent (eventContext -> {
if (eventContext .getEventData ().getSequenceNumber () % 100 == 0 ) {
System .out .println (
"Updating checkpoint for partition " + eventContext .getPartitionContext ().getPartitionId ()
+ " with seq num " + eventContext .getEventData ().getSequenceNumber ());
eventContext .updateCheckpoint ();
}
})
.processError (errorContext -> {
System .out .println (
"Error " + errorContext .getPartitionContext ().getPartitionId () + " " + errorContext .getThrowable ()
.getMessage ());
})
.buildEventProcessorClient ();
processor .start ();
TimeUnit .DAYS .sleep (1 );
processor .stop ();
}
}
Event Processor with process event handler error
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventProcessorClient ;
import com .azure .messaging .eventhubs .EventProcessorClientBuilder ;
import com .azure .storage .blob .BlobContainerAsyncClient ;
import com .azure .storage .blob .BlobContainerClientBuilder ;
import java .util .concurrent .TimeUnit ;
public class EventProcessor {
private static final String STORAGE_CONNECTION_STRING ="" ;
private static final String SAS_TOKEN = "" ;
private static final String CONTAINER_NAME = "" ;
private static final String EH_CONNECTION_STRING = "" ;
private static final String CONSUMER_GROUP = "" ;
public static void main (String [] args ) throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder ()
.connectionString (STORAGE_CONNECTION_STRING )
.containerName (CONTAINER_NAME )
.sasToken (SAS_TOKEN )
.buildAsyncClient ();
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore (blobContainerAsyncClient );
EventProcessorClient processor = new EventProcessorClientBuilder ()
.checkpointStore (blobCheckpointStore )
.connectionString (EH_CONNECTION_STRING )
.consumerGroup (CONSUMER_GROUP )
.processEvent (eventContext -> { throw new IllegalStateException ("process error" ); })
.processError (errorContext -> {
System .out .println (
"Error " + errorContext .getPartitionContext ().getPartitionId () + " " + errorContext .getThrowable ()
.getMessage ());
})
.buildEventProcessorClient ();
processor .start ();
TimeUnit .DAYS .sleep (1 );
processor .stop ();
}
}
Event Processor with close handler error
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventProcessorClient ;
import com .azure .messaging .eventhubs .EventProcessorClientBuilder ;
import com .azure .storage .blob .BlobContainerAsyncClient ;
import com .azure .storage .blob .BlobContainerClientBuilder ;
import java .util .concurrent .TimeUnit ;
public class EventProcessor {
private static final String STORAGE_CONNECTION_STRING ="" ;
private static final String SAS_TOKEN = "" ;
private static final String CONTAINER_NAME = "" ;
private static final String EH_CONNECTION_STRING = "" ;
private static final String CONSUMER_GROUP = "" ;
public static void main (String [] args ) throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder ()
.connectionString (STORAGE_CONNECTION_STRING )
.containerName (CONTAINER_NAME )
.sasToken (SAS_TOKEN )
.buildAsyncClient ();
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore (blobContainerAsyncClient );
EventProcessorClient processor = new EventProcessorClientBuilder ()
.checkpointStore (blobCheckpointStore )
.connectionString (EH_CONNECTION_STRING )
.consumerGroup (CONSUMER_GROUP )
.processPartitionClose (closeContext -> {throw new IllegalStateException ("close error" );})
.processEvent (eventContext -> {
if (eventContext .getEventData ().getSequenceNumber () % 100 == 0 ) {
System .out .println (
"Updating checkpoint for partition " + eventContext .getPartitionContext ().getPartitionId ()
+ " with seq num " + eventContext .getEventData ().getSequenceNumber ());
eventContext .updateCheckpoint ();
}
})
.processError (errorContext -> {
System .out .println (
"Error " + errorContext .getPartitionContext ().getPartitionId () + " " + errorContext .getThrowable ()
.getMessage ());
})
.buildEventProcessorClient ();
processor .start ();
TimeUnit .DAYS .sleep (1 );
processor .stop ();
}
}
Event Processor with inital event position
package com .azure .messaging .eventhubs .checkpointstore .blob ;
import com .azure .messaging .eventhubs .EventProcessorClient ;
import com .azure .messaging .eventhubs .EventProcessorClientBuilder ;
import com .azure .storage .blob .BlobContainerAsyncClient ;
import com .azure .storage .blob .BlobContainerClientBuilder ;
import java .util .concurrent .TimeUnit ;
public class EventProcessor {
private static final String STORAGE_CONNECTION_STRING ="" ;
private static final String SAS_TOKEN = "" ;
private static final String CONTAINER_NAME = "" ;
private static final String EH_CONNECTION_STRING = "" ;
private static final String CONSUMER_GROUP = "" ;
public static void main (String [] args ) throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder ()
.connectionString (STORAGE_CONNECTION_STRING )
.containerName (CONTAINER_NAME )
.sasToken (SAS_TOKEN )
.buildAsyncClient ();
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore (blobContainerAsyncClient );
Map <String , EventPosition > initPositionMap = new HashMap <>() {{
put ("0" , EventPosition .earliest ());
put ("1" , EventPosition .fromSequenceNumber (10 ));
}};
EventProcessorClient processor = new EventProcessorClientBuilder ()
.checkpointStore (blobCheckpointStore )
.connectionString (EH_CONNECTION_STRING )
.consumerGroup (CONSUMER_GROUP )
.initialPartitionEventPosition (initPositionMap )
.processEvent (eventContext -> {
if (eventContext .getEventData ().getSequenceNumber () % 100 == 0 ) {
System .out .println (
"Updating checkpoint for partition " + eventContext .getPartitionContext ().getPartitionId ()
+ " with seq num " + eventContext .getEventData ().getSequenceNumber ());
eventContext .updateCheckpoint ();
}
})
.processError (errorContext -> {
System .out .println (
"Error " + errorContext .getPartitionContext ().getPartitionId () + " " + errorContext .getThrowable ()
.getMessage ());
})
.buildEventProcessorClient ();
processor .start ();
TimeUnit .DAYS .sleep (1 );
processor .stop ();
}
}