Created
August 4, 2022 16:23
-
-
Save alexpearce/fb7b91831363102b5887fde25167165f to your computer and use it in GitHub Desktop.
Real-time consumer of Celery's event queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
#: Name of the Celery events exchange | |
EXCHANGE_NAME = "celeryev" | |
#: Arbitrary name for the transient queue this script will create and consume from | |
QUEUE_NAME = "pikatest" | |
#: Global to hold our channel object in | |
channel = None | |
# Step #2 | |
def on_connected(connection): | |
"""Called when we are fully connected to RabbitMQ""" | |
# Open a channel | |
print("Connected") | |
connection.channel(on_open_callback=on_channel_open) | |
# Step #3 | |
def on_channel_open(new_channel): | |
"""Called when our channel has opened""" | |
global channel | |
channel = new_channel | |
print("Declaring exchange") | |
channel.exchange_declare( | |
exchange=EXCHANGE_NAME, | |
exchange_type=pika.exchange_type.ExchangeType.topic, | |
passive=True, | |
callback=on_exchange_declared, | |
) | |
# Step #4 | |
def on_exchange_declared(frame: pika.frame.Method): | |
print("Declaring queue") | |
channel.queue_declare( | |
queue=QUEUE_NAME, | |
durable=False, | |
exclusive=True, | |
auto_delete=True, | |
callback=on_queue_declared, | |
) | |
# Step #5 | |
def on_queue_declared(frame: pika.frame.Method): | |
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ""" | |
print("Binding queue") | |
channel.queue_bind( | |
QUEUE_NAME, EXCHANGE_NAME, routing_key="#", callback=on_queue_bound | |
) | |
# Step #6 | |
def on_queue_bound(frame: pika.frame.Method): | |
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ""" | |
print("Consuming from queue") | |
channel.basic_consume(QUEUE_NAME, on_message_callback=handle_delivery) | |
# Step #7 | |
def handle_delivery( | |
channel: pika.channel.Channel, | |
method: pika.spec.Basic.Deliver, | |
header: pika.spec.BasicProperties, | |
body: bytes, | |
): | |
"""Called when we receive a message from RabbitMQ""" | |
# Choose to ignore worker heartbeat events hear to reduce log volume | |
if b"worker-heartbeat" not in body: | |
print("Handling delivery:", body) | |
channel.basic_ack(method.delivery_tag) | |
# Step #1: Connect to RabbitMQ using the default parameters | |
credentials = pika.PlainCredentials("rabbitmquser", "rabbitmqpassword") | |
parameters = pika.ConnectionParameters( | |
host="hostname", port=5672, credentials=credentials | |
) | |
connection = pika.SelectConnection(parameters, on_open_callback=on_connected) | |
try: | |
# Loop so we can communicate with RabbitMQ | |
connection.ioloop.start() | |
except KeyboardInterrupt: | |
# Gracefully close the connection | |
connection.close() | |
# Loop until we're fully closed, will stop on its own | |
connection.ioloop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment