Skip to content

Instantly share code, notes, and snippets.

@akmamun
Last active September 16, 2023 09:54
Show Gist options
  • Save akmamun/3babb80b228d6009c72b0b806152cab2 to your computer and use it in GitHub Desktop.
Save akmamun/3babb80b228d6009c72b0b806152cab2 to your computer and use it in GitHub Desktop.
Kafka Real-time Event Processing with Django
import json
import logging
from time import sleep
from confluent_kafka import Consumer
from django.core.management import BaseCommand
logger = logging.getLogger(__name__)
# pip install confluent_kafka
"""run commands
python manage.py event_listener
"""
CONSUMER_TIMEOUT = 5
KAFKA_CONSUMER_RECONNECT_TIME = 5
def connect_consumer():
consumer_config = {'bootstrap.servers':"KAFKA_SERVERS_URLS", "group.id": "KAFKA_GROUP_ID"}
consumer = Consumer(consumer_config)
logger.info("started kafka consumer")
return consumer
class Command(BaseCommand):
help = "Runs the event listener"
def handle(self, *args, **options):
SUBSCRIBE_LIST = []
consumer = connect_consumer()
consumer.subscribe(SUBSCRIBE_LIST)
logger.debug("subscribed")
try:
while True:
msg = consumer.poll(timeout=CONSUMER_TIMEOUT)
logger.debug("message pulled success")
if msg is None:
logger.info("no message found to process")
continue
results = json.loads(msg.value().decode("utf-8"))
# if msg.topic() == 'test_topic_name':
# agent_settlement_event.settlement_status_update(results)
except Exception as ex:
logger.error(repr(ex))
consumer.close()
logger.error("kafka closed connection")
sleep(KAFKA_CONSUMER_RECONNECT_TIME)
consumer = connect_consumer()
logger.info("connection reconnect success")
consumer.subscribe(SUBSCRIBE_LIST)
logger.debug("subscribed after reconnect")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment