Skip to content

Instantly share code, notes, and snippets.

@grawert
Last active March 19, 2020 10:56
Show Gist options
  • Save grawert/0247392d80c5ca94d34f9eaa11eb6e3c to your computer and use it in GitHub Desktop.
Save grawert/0247392d80c5ca94d34f9eaa11eb6e3c to your computer and use it in GitHub Desktop.
Sniff amq messages by connecting to an exchange with routing key
#!/usr/bin/env python
import sys
import optparse
import logging as log
from kombu import BrokerConnection
from kombu import Exchange
from kombu import Queue
from kombu.mixins import ConsumerMixin
class QueueDump(ConsumerMixin):
def __init__(self, connection, queue_name, exchange_name, routing_key_name):
self.connection = connection
self.queue_name = queue_name
self.exchange_name = exchange_name
self.routing_key_name = routing_key_name
return
def get_consumers(self, consumer, channel):
exchange = Exchange(self.exchange_name, type="topic", durable=False)
queue = Queue(self.queue_name, exchange, routing_key = self.routing_key_name,
durable=False, auto_delete=True, no_ack=False)
return [ consumer(queue, callbacks = [self.on_message]) ]
def on_message(self, body, message):
log.info(body)
def arguments():
description = '%prog -c HOSTNAME:PORT -u USERNAME -p PASSWORD -e EXCHANGE_NAME -k ROUTING_KEY_NAME'
parser = optparse.OptionParser(description)
parser.add_option("-c", "--connect", dest='broker', default="localhost:5672", help='Broker host and port')
parser.add_option("-v", "--virtualhost", dest='virtual_host', default="/", help='Brokers virtual host')
parser.add_option("-u", "--username", dest='username', default="openstack", help='Connection user name')
parser.add_option("-p", "--password", dest='password', default="secret", help='Connection users password')
parser.add_option("-q", "--queue", dest='queue', default="sniffer_dump_queue", help='Temprorary queue name binding to exchange')
parser.add_option("-e", "--exchange", dest='exchange', default="nova", help='Exchange name binding to')
parser.add_option("-k", "--routingkey", dest='routing_key', default="notifications.info", help='Routing key name')
return parser.parse_args()
if __name__ == "__main__":
log.basicConfig(stream=sys.stdout, level=log.DEBUG)
(options, args) = arguments()
uri = "amqp://" + options.username + ":" + options.password + "@" + options.broker + "/" + options.virtual_host
try:
with BrokerConnection(uri) as connection:
QueueDump(connection, options.queue, options.exchange, options.routing_key).run()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment