Skip to content

Instantly share code, notes, and snippets.

@farukyildiz
Created June 14, 2023 09:30
Show Gist options
  • Save farukyildiz/2a94da7ec460b8890653d71e706ce967 to your computer and use it in GitHub Desktop.
Save farukyildiz/2a94da7ec460b8890653d71e706ce967 to your computer and use it in GitHub Desktop.
message queue setup
from confluent_kafka import Consumer
conf = {'bootstrap.servers': "my_ip:9092",
'auto.offset.reset': 'smallest'}
consumer = Consumer(conf)
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
print(msg.value())
msg_count += 1
finally:
# Close down consumer to commit final offsets.
consumer.close()
consume_loop(consumer, ['main.topic'])
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://my_ip:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
iptables -I OUTPUT -p tcp --dport 9092 -j ACCEPT
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': "my_ip:9092"}
producer = Producer(conf)
producer.produce("main.topic", key="key", value="value")
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment