Skip to content

Instantly share code, notes, and snippets.

@jonuwz
Last active April 17, 2024 17:51
Show Gist options
  • Save jonuwz/ce5662093852224d3a22fbefb9351df9 to your computer and use it in GitHub Desktop.
Save jonuwz/ce5662093852224d3a22fbefb9351df9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an Event Hub.
Create a .env file with the content :
EVENTHUB_FQDN="XXXXXXXX.servicebus.windows.net"
EVENTHUB_NAME="your_event_hub_name"
CONSUMER_GROUP='$Default' # make sure you change this if it'll affect d$aother consumers
AZURE_TENANT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
AZURE_CLIENT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
AZURE_CLIENT_SECRET="the_secret"
"""
import os
from azure.eventhub import EventHubConsumerClient
from azure.identity import EnvironmentCredential
from dotenv import load_dotenv
load_dotenv(".env")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
print(event)
def on_partition_initialize(partition_context):
# Put your code here.
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
# Put your code here.
print("Partition: {} has been closed, reason for closing: {}.".format(
partition_context.partition_id,
reason
))
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
if __name__ == '__main__':
credential = EnvironmentCredential()
consumer_client = EventHubConsumerClient(
fully_qualified_namespace=os.environ["EVENTHUB_FQDN"],
consumer_group=os.environ["CONSUMER_GROUP"],
eventhub_name=os.environ["EVENTHUB_NAME"],
credential=credential,
)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
starting_position="@latest",
)
except KeyboardInterrupt:
print('Stopped receiving.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment