// Name to be changed
export class EventProcessor {
constructor(
consumerGroupName: string,
eventHubClient: EventHubClient,
partitionProcessorFactory: PartitionProcessorFactory,
partitionManager: PartitionManager, // the plugin for checkpoint and partition ownership management
options?: EventProcessorOptions // optional
)
public async start(): Promise<void>;
public async stop(): Promise<void>;
}
// Options passed when creating EventProcessor, everything is optional
export interface EventProcessorOptions {
// Should this be EventPositionProvider moving forward?
// Stays as Event Position for upcoming preview
initialEventPosition?: EventPosition;
maxBatchSize?: number;
maxWaitTime?: number;
}
/**
* The PartitionProcessorFactory is called by EPH whenever a new partition is about to be processed.
*/
export interface PartitionProcessorFactory {
createPartitionProcessor(context: PartitionContext, checkpointManger: CheckpointManager): PartitionProcessor;
}
/**
* Contains information about the partition the EventProcessor will be processing events from.
* Since it is passed to the factory call to create EventProcessor, we don't need to pass it
* to each method inside the processor like we did in Track 1
*
* Checkpointing logic is moved out of the context for better discoverability
*/
export class PartitionContext {
public readonly partitionId: string;
public readonly eventHubName: string;
public readonly consumerGroupName: string;
}
/**
* CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
* This was not needed in Track 1 as the checkpoint() function was available on the PartitionContext
* This may not be needed in Track 2 if passing a lambda function works
*/
export class CheckpointManager {
private partitionContext: PartitionContext; // for internal use by createCheckpoint
private partitionManager: PartitionManager; // for internal use by createCheckpoint
public async createCheckpoint(eventData: EventData): Promise<void> {}
public async createCheckpoint(offset, sequenceNumber): Promise<void> {}
}
export interface PartitionProcessor {
/**
* Optional. Called when EPH begins processing a partition.
* Python has __init__() which means something else altogether
*/
initialize?: async () => Promise<void>
/**
* Optional. Called when EPH stops processing a partition.
* This may occur when control of the partition switches to another EPH or when user stops EPH
*/
close?: async (reason: CloseReason) => Promise<void>
/**
* Called when a batch of events have been received.
*/
processEvents: async (events: EventData[]) => Promise<void>
/**
* Called when the underlying client experiences an error while receiving.
*/
processError: async (error: Error) => Promise<void>
}
// Interface for the plugin to be passed when creating the EventProcessorHost
// to manage partition ownership and checkpoint creation.
// Deals mainly with read/write to the chosen storage service
export interface PartitionManager {
async listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
async createCheckpoint(checkpoint: Checkpoint): Promise<void>;
}
// used by PartitionManager to claim ownership.
// returned by listOwnerships
interface PartitionOwnership {
eventHubName: string;
consumerGroupName: string;
instanceId: string;
partitionId: string;
ownerLevel: number;
offset?: number;
sequenceNumber?: number;
lastModifiedTime?: number; // in ms
ETag?: string;
}
// used by createCheckpoint in PartitionManager
interface Checkpoint {
eventHubName: string;
consumerGroupName: string;
instanceId: string;
partitionId: string;
sequenceNumber: number;
offset: number;
}
Last active
July 26, 2019 01:38
-
-
Save ramya-rao-a/6ce030240112063e12a23477697b01f2 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Python API has less class defined.
Python example: