Skip to content

Instantly share code, notes, and snippets.

@javabrett
Last active May 4, 2020 12:24
Show Gist options
  • Save javabrett/8d6f94b63f2e301f91a22aca814336b1 to your computer and use it in GitHub Desktop.
Save javabrett/8d6f94b63f2e301f91a22aca814336b1 to your computer and use it in GitHub Desktop.

Docker image installing Confluent Kafka Python client

Build image

docker build -t confluent-kafka-python-produce-demo .

Run container

# set these properly
export BOOTSTRAP_SERVERS=pkc-....confluent.cloud:9092
export SASL_USERNAME=
export SASL_PASSWORD=
export TOPIC=foo

docker run -it --rm -e BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS} -e SASL_USERNAME=${SASL_USERNAME} -e SASL_PASSWORD=${SASL_PASSWORD} -e TOPIC=${TOPIC} confluent-kafka-python-produce-demo
FROM ubuntu:18.04
ENV SASL_MECHANISMS="PLAIN"
ENV SECURITY_PROTOCOL="SASL_SSL"
RUN apt-get update && \
apt-get install -y python3 python3-pip curl vim
RUN pip3 install -U confluent-kafka
COPY *.py ./
CMD python3 producer.py
#!/usr/bin/env python
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# =============================================================================
#
# Produce messages to Confluent Cloud
# Using Confluent Python Client for Apache Kafka
#
# =============================================================================
from confluent_kafka import Producer, KafkaError
import json
import os
if __name__ == '__main__':
# Initialization
# Create Producer instance
p = Producer({
'bootstrap.servers': os.environ['BOOTSTRAP_SERVERS'],
'sasl.mechanisms': os.environ['SASL_MECHANISMS'],
'security.protocol': os.environ['SECURITY_PROTOCOL'],
'sasl.username': os.environ['SASL_USERNAME'],
'sasl.password': os.environ['SASL_PASSWORD'],
})
topic = os.environ['TOPIC']
delivered_records = 0
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or
# permanently failed delivery (after retries).
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
for n in range(10):
record_key = "alice"
record_value = json.dumps({'count': n})
print("Producing record: {}\t{}".format(record_key, record_value))
p.produce(topic, key=record_key, value=record_value, on_delivery=acked)
# p.poll() serves delivery reports (on_delivery)
# from previous produce() calls.
p.poll(0)
p.flush(10)
print("{} messages were produced to topic {}!".format(delivered_records, topic))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment