Created
April 14, 2021 13:33
-
-
Save mze3e/dd63ff6701db3581c296163c6be45197 to your computer and use it in GitHub Desktop.
Python Kafka Producer Examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Example 1 | |
#Project: dino Author: thenetcircle File: kafka.py License: Apache License 2.0 | |
def __init__(self, env, is_external_queue: bool): | |
super().__init__(env, is_external_queue, queue_type='kafka', logger=logger) | |
eq_host = env.config.get(ConfigKeys.HOST, domain=self.domain_key, default=None) | |
eq_queue = env.config.get(ConfigKeys.QUEUE, domain=self.domain_key, default=None) | |
if eq_host is None or len(eq_host) == 0 or (type(eq_host) == str and len(eq_host.strip()) == 0): | |
logging.warning('blank external host specified, not setting up external publishing') | |
return | |
if eq_queue is None or len(eq_queue.strip()) == 0: | |
logging.warning('blank external queue specified, not setting up external publishing') | |
return | |
if type(eq_host) == str: | |
eq_host = [eq_host] | |
from kafka import KafkaProducer | |
import json | |
self.queue = eq_queue | |
self.queue_connection = KafkaProducer( | |
bootstrap_servers=eq_host, | |
value_serializer=lambda v: json.dumps(v).encode('utf-8')) | |
logger.info('setting up pubsub for type "{}: and host(s) "{}"'.format(self.queue_type, ','.join(eq_host))) | |
#Example 2 | |
#Project: dino Author: thenetcircle File: kafka.py License: Apache License 2.0 | |
def try_publish(self, message): | |
if self.env.enrichment_manager is not None: | |
message = self.env.enrichment_manager.handle(message) | |
topic_key = None | |
# try to get some consistency | |
try: | |
target = message.get('target', dict()) | |
topic_key = target.get('id', None) | |
if topic_key is None: | |
actor = message.get('actor', dict()) | |
topic_key = actor.get('id', None) | |
# kafka publisher can't handle string keys | |
topic_key = bytes(str(topic_key), encoding='utf-8') | |
except Exception as partition_e: | |
logger.exception(traceback.format_exc()) | |
environ.env.capture_exception(partition_e) | |
# for kafka, the queue_connection is the KafkaProducer and queue is the topic name | |
self.queue_connection.send( | |
topic=self.queue, value=message, key=topic_key) | |
#Example 3 | |
#Project: scrapy-cluster Author: istresearch File: rest_service.py License: MIT License | |
def _create_producer(self): | |
"""Tries to establish a Kafka consumer connection""" | |
if not self.closed: | |
try: | |
self.logger.debug("Creating new kafka producer using brokers: " + | |
str(self.settings['KAFKA_HOSTS'])) | |
return KafkaProducer(bootstrap_servers=self.settings['KAFKA_HOSTS'], | |
value_serializer=lambda v: json.dumps(v).encode('utf-8'), | |
retries=3, | |
linger_ms=self.settings['KAFKA_PRODUCER_BATCH_LINGER_MS'], | |
buffer_memory=self.settings['KAFKA_PRODUCER_BUFFER_BYTES']) | |
except KeyError as e: | |
self.logger.error('Missing setting named ' + str(e), | |
{'ex': traceback.format_exc()}) | |
except: | |
self.logger.error("Couldn't initialize kafka producer.", | |
{'ex': traceback.format_exc()}) | |
raise | |
#Example 4 | |
#Project: scrapy-cluster Author: istresearch File: kafka_monitor.py License: MIT License | |
def _create_producer(self): | |
"""Tries to establish a Kafka consumer connection""" | |
try: | |
brokers = self.settings['KAFKA_HOSTS'] | |
self.logger.debug("Creating new kafka producer using brokers: " + | |
str(brokers)) | |
return KafkaProducer(bootstrap_servers=brokers, | |
value_serializer=lambda m: json.dumps(m), | |
retries=3, | |
linger_ms=self.settings['KAFKA_PRODUCER_BATCH_LINGER_MS'], | |
buffer_memory=self.settings['KAFKA_PRODUCER_BUFFER_BYTES']) | |
except KeyError as e: | |
self.logger.error('Missing setting named ' + str(e), | |
{'ex': traceback.format_exc()}) | |
except: | |
self.logger.error("Couldn't initialize kafka producer.", | |
{'ex': traceback.format_exc()}) | |
raise | |
#Example 5 | |
#Project: scrapy-cluster Author: istresearch File: kafka_base_monitor.py License: MIT License | |
def _create_producer(self, settings): | |
"""Tries to establish a Kafka consumer connection""" | |
try: | |
brokers = settings['KAFKA_HOSTS'] | |
self.logger.debug("Creating new kafka producer using brokers: " + | |
str(brokers)) | |
return KafkaProducer(bootstrap_servers=brokers, | |
value_serializer=lambda m: json.dumps(m), | |
retries=3, | |
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'], | |
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES']) | |
except KeyError as e: | |
self.logger.error('Missing setting named ' + str(e), | |
{'ex': traceback.format_exc()}) | |
except: | |
self.logger.error("Couldn't initialize kafka producer in plugin.", | |
{'ex': traceback.format_exc()}) | |
raise | |
#Example 6 | |
#Project: rasa_core Author: RasaHQ File: broker.py License: Apache License 2.0 | |
def _create_producer(self): | |
import kafka | |
if self.security_protocol == 'SASL_PLAINTEXT': | |
self.producer = kafka.KafkaProducer( | |
bootstrap_servers=[self.host], | |
value_serializer=lambda v: json.dumps(v).encode('utf-8'), | |
sasl_plain_username=self.sasl_username, | |
sasl_plain_password=self.sasl_password, | |
sasl_mechanism='PLAIN', | |
security_protocol=self.security_protocol) | |
elif self.security_protocol == 'SSL': | |
self.producer = kafka.KafkaProducer( | |
bootstrap_servers=[self.host], | |
value_serializer=lambda v: json.dumps(v).encode('utf-8'), | |
ssl_cafile=self.ssl_cafile, | |
ssl_certfile=self.ssl_certfile, | |
ssl_keyfile=self.ssl_keyfile, | |
ssl_check_hostname=False, | |
security_protocol=self.security_protocol) | |
#Example 7 | |
#Project: webhook-shims Author: vmw-loginsight File: kafkatopic.py License: Apache License 2.0 | |
def kafka(TOPIC=None): | |
# Lazy init of the Kafka producer | |
# | |
global PRODUCER | |
if PRODUCER is None: | |
PRODUCER = KafkaProducer( | |
bootstrap_servers=KAFKA_BOOSTRAP_SERVERS, | |
sasl_mechanism=KAFKA_SASL_MECHANISM, | |
sasl_plain_username=KAFKA_USER, | |
sasl_plain_password=KAFKA_PASSWORD) | |
try: | |
future = PRODUCER.send(TOPIC, request.get_data()) | |
future.get(timeout=60) | |
return "OK", 200, None | |
except KafkaTimeoutError: | |
return "Internal Server Error", 500, None | |
#Example 8 | |
#Project: ozymandias Author: pambot File: ozy_producer.py License: MIT License | |
def main(n): | |
"""Stream the video into a Kafka producer in an infinite loop""" | |
topic = choose_channel(n) | |
video_reader = imageio.get_reader(DATA + topic + '.mp4', 'ffmpeg') | |
metadata = video_reader.get_meta_data() | |
fps = metadata['fps'] | |
producer = KafkaProducer(bootstrap_servers='localhost:9092', | |
batch_size=15728640, | |
linger_ms=1000, | |
max_request_size=15728640, | |
value_serializer=lambda v: json.dumps(v.tolist())) | |
while True: | |
video_loop(video_reader, producer, topic, fps) | |
#Example 9 | |
#Project: rasa-for-botfront Author: botfront File: kafka.py License: Apache License 2.0 | |
def _create_producer(self) -> None: | |
import kafka | |
if self.security_protocol == "SASL_PLAINTEXT": | |
self.producer = kafka.KafkaProducer( | |
bootstrap_servers=[self.host], | |
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), | |
sasl_plain_username=self.sasl_username, | |
sasl_plain_password=self.sasl_password, | |
sasl_mechanism="PLAIN", | |
security_protocol=self.security_protocol, | |
) | |
elif self.security_protocol == "SSL": | |
self.producer = kafka.KafkaProducer( | |
bootstrap_servers=[self.host], | |
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), | |
ssl_cafile=self.ssl_cafile, | |
ssl_certfile=self.ssl_certfile, | |
ssl_keyfile=self.ssl_keyfile, | |
ssl_check_hostname=False, | |
security_protocol=self.security_protocol, | |
) | |
#Example 10 | |
#Project: ColumbiaImageSearch Author: ColumbiaDVMM File: kafka_pusher.py License: Apache License 2.0 | |
def init_producer(self): | |
"""Initialize KafkaProducer | |
""" | |
print("[{}: log] Initializing producer...".format(self.pp)) | |
# Gather optional parameters | |
dict_args = dict() | |
#dict_args = self.get_servers(dict_args, 'producer_servers') | |
#dict_args = self.get_security(dict_args, 'producer_security') | |
dict_args = self.get_servers(dict_args, 'servers') | |
dict_args = self.get_security(dict_args, 'security') | |
# Instantiate producer | |
try: | |
self.producer = KafkaProducer(**dict_args) | |
except Exception as inst: | |
msg = "[{}: ERROR] Could not initialize producer with arguments {}. Error was: {}" | |
raise RuntimeError(msg.format(self.pp, dict_args, inst)) | |
self.topic_name = self.get_required_param("topic_name") | |
#Example 11 | |
#Project: karapace Author: aiven File: karapace.py License: Apache License 2.0 | |
def _create_producer(self): | |
while True: | |
try: | |
return KafkaProducer( | |
bootstrap_servers=self.config["bootstrap_uri"], | |
security_protocol=self.config["security_protocol"], | |
ssl_cafile=self.config["ssl_cafile"], | |
ssl_certfile=self.config["ssl_certfile"], | |
ssl_keyfile=self.config["ssl_keyfile"], | |
api_version=(1, 0, 0), | |
metadata_max_age_ms=self.config["metadata_max_age_ms"], | |
max_block_ms=2000 # missing topics will block unless we cache cluster metadata and pre-check | |
) | |
except: # pylint: disable=bare-except | |
self.log.exception("Unable to create producer, retrying") | |
time.sleep(1) | |
#Example 12 | |
#Project: pyspark-twitter-stream-mining Author: amir-rahnama File: twitter_stream.py License: MIT License | |
def __init__(self): | |
self.producer = KafkaProducer(bootstrap_servers='docker:9092', value_serializer=lambda v: json.dumps(v)) | |
self.tweets = [] | |
#Example 13 | |
#Project: scrapy-cluster Author: istresearch File: pipelines.py License: MIT License | |
def from_settings(cls, settings): | |
my_level = settings.get('SC_LOG_LEVEL', 'INFO') | |
my_name = settings.get('SC_LOGGER_NAME', 'sc-logger') | |
my_output = settings.get('SC_LOG_STDOUT', True) | |
my_json = settings.get('SC_LOG_JSON', False) | |
my_dir = settings.get('SC_LOG_DIR', 'logs') | |
my_bytes = settings.get('SC_LOG_MAX_BYTES', '10MB') | |
my_file = settings.get('SC_LOG_FILE', 'main.log') | |
my_backups = settings.get('SC_LOG_BACKUPS', 5) | |
my_appids = settings.get('KAFKA_APPID_TOPICS', False) | |
logger = LogFactory.get_instance(json=my_json, | |
name=my_name, | |
stdout=my_output, | |
level=my_level, | |
dir=my_dir, | |
file=my_file, | |
bytes=my_bytes, | |
backups=my_backups) | |
try: | |
producer = KafkaProducer(bootstrap_servers=settings['KAFKA_HOSTS'], | |
retries=3, | |
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'], | |
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES']) | |
except Exception as e: | |
logger.error("Unable to connect to Kafka in Pipeline"\ | |
", raising exit flag.") | |
# this is critical so we choose to exit. | |
# exiting because this is a different thread from the crawlers | |
# and we want to ensure we can connect to Kafka when we boot | |
sys.exit(1) | |
topic_prefix = settings['KAFKA_TOPIC_PREFIX'] | |
use_base64 = settings['KAFKA_BASE_64_ENCODE'] | |
return cls(producer, topic_prefix, logger, appids=my_appids, | |
use_base64=use_base64) | |
#Example 14 | |
#Project: orion-server Author: LINKIWI File: stream.py License: MIT License | |
def __init__(self, kafka_addr, kafka_topic): | |
""" | |
Client for producing location messages to a Kafka broker. | |
:param kafka_addr: Address to the Kafka broker. | |
:param kafka_topic: Name of the Kafka topic to which messages should be published. | |
""" | |
# Bypass event publishing entirely when no broker address is specified. | |
producer_factory = (kafka_addr and kafka.KafkaProducer) or NoopProducer | |
self.topic = kafka_topic | |
self.producer = producer_factory( | |
bootstrap_servers=kafka_addr, | |
value_serializer=json.dumps, | |
) | |
#Example 15 | |
#Project: network-programmability-stream Author: dmfigol File: nc_dial_in_subscribe.py License: MIT License | |
def kafka_connect(self): | |
self.producer = KafkaProducer( | |
bootstrap_servers=KAFKA_URL, | |
value_serializer=lambda v: json.dumps(v).encode('utf-8')) | |
#Example 16 | |
#Project: sniffer Author: threathunterX File: sniffer_nebula_test.py License: Apache License 2.0 | |
def __init__(self, bootstrap_servers, kafkatopic): | |
self.kafkatopic = kafkatopic | |
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers) | |
#Example 17 | |
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License | |
def _make_producer(client_config): | |
return kafka.KafkaProducer(bootstrap_servers=client_config.brokers, | |
compression_type='gzip') | |
#Example 18 | |
#Project: monasca-analytics Author: openstack File: kafkas.py License: Apache License 2.0 | |
def sink_dstream(self, dstream): | |
if self._producer is None: | |
self._producer = kafka.KafkaProducer( | |
bootstrap_servers="{0}:{1}".format(self._host, self._port)) | |
dstream.foreachRDD(self._persist) | |
#Example 19 | |
#Project: Gather-Deployment Author: huseinzol05 File: producer.py License: MIT License | |
def connect_kafka_producer(): | |
print('connecting to kafka') | |
_producer = None | |
try: | |
_producer = KafkaProducer( | |
bootstrap_servers = ['localhost:9092'], api_version = (0, 10) | |
) | |
except Exception as ex: | |
print('Exception while connecting Kafka') | |
print(str(ex)) | |
finally: | |
print('successfully connected to kafka') | |
return _producer | |
#Example 20 | |
#Project: Gather-Deployment Author: huseinzol05 File: main.py License: MIT License | |
def connect_kafka_producer(): | |
print('connecting to kafka') | |
_producer = None | |
try: | |
_producer = KafkaProducer( | |
bootstrap_servers = ['kafka:9092'], | |
api_version = (0, 10), | |
partitioner = RoundRobinPartitioner(), | |
) | |
except Exception as ex: | |
print('Exception while connecting Kafka') | |
print(str(ex)) | |
finally: | |
print('successfully connected to kafka') | |
return _producer | |
#Example 21 | |
#Project: ztag Author: zmap File: stream.py License: Apache License 2.0 | |
def __init__(self, logger=None, destination=None, *args, **kwargs): | |
from kafka import KafkaProducer | |
if destination == "full_ipv4": | |
self.topic = "ipv4" | |
elif destination == "alexa_top1mil": | |
self.topic = "domain" | |
else: | |
raise Exception("invalid destination: %s" % destination) | |
host = os.environ.get('KAFKA_BOOTSTRAP_HOST', 'localhost:9092') | |
self.main_producer = KafkaProducer(bootstrap_servers=host) | |
self.cert_producer = KafkaProducer(bootstrap_servers=host) | |
#Example 22 | |
#Project: distributed_framework Author: ydf0509 File: kafka_publisher.py License: Apache License 2.0 | |
def custom_init(self): | |
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS) | |
try: | |
admin_client = KafkaAdminClient(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS) | |
admin_client.create_topics([NewTopic(self._queue_name, 16, 1)]) | |
# admin_client.create_partitions({self._queue_name: NewPartitions(total_count=16)}) | |
except TopicAlreadyExistsError: | |
pass | |
except Exception as e: | |
self.logger.exception(e) | |
atexit.register(self.close) # 程序退出前不主动关闭,会报错。 | |
#Example 23 | |
#Project: distributed_framework Author: ydf0509 File: log_manager000.py License: Apache License 2.0 | |
def __init__(self, bootstrap_servers, **configs): | |
""" | |
:param elastic_hosts: es的ip地址,数组类型 | |
:param elastic_port: es端口 | |
:param index_prefix: index名字前缀。 | |
""" | |
logging.Handler.__init__(self) | |
if not self.__class__.kafka_producer: | |
very_nb_print('实例化kafka producer') | |
self.__class__.kafka_producer = KafkaProducer(bootstrap_servers=bootstrap_servers, **configs) | |
t = Thread(target=self._do_bulk_op) | |
t.setDaemon(True) | |
t.start() | |
#Example 24 | |
#Project: distributed_framework Author: ydf0509 File: kafka_consumer.py License: Apache License 2.0 | |
def _shedual_task(self): | |
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS) | |
consumer = OfficialKafkaConsumer(self._queue_name, bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS, | |
group_id='frame_group', enable_auto_commit=True) | |
# REMIND 由于是很高数量的并发消费,线程很多,分区很少,这里设置成自动确认消费了,否则多线程提交同一个分区的偏移量导致超前错乱,就没有意义了。 | |
# REMIND 要保证很高的可靠性和一致性,请用rabbitmq。 | |
# REMIND 好处是并发高。topic像翻书一样,随时可以设置偏移量重新消费。多个分组消费同一个主题,每个分组对相同主题的偏移量互不干扰。 | |
for message in consumer: | |
# 注意: message ,value都是原始的字节数据,需要decode | |
self.logger.debug( | |
f'从kafka的 [{message.topic}] 主题,分区 {message.partition} 中 取出的消息是: {message.value.decode()}') | |
kw = {'consumer': consumer, 'message': message, 'body': json.loads(message.value)} | |
self._submit_task(kw) | |
#Example 25 | |
#Project: parsedmarc Author: domainaware File: kafkaclient.py License: Apache License 2.0 | |
def __init__(self, kafka_hosts, ssl=False, username=None, | |
password=None, ssl_context=None): | |
""" | |
Initializes the Kafka client | |
Args: | |
kafka_hosts (list): A list of Kafka hostnames | |
(with optional port numbers) | |
ssl (bool): Use a SSL/TLS connection | |
username (str): An optional username | |
password (str): An optional password | |
ssl_context: SSL context options | |
Notes: | |
``use_ssl=True`` is implied when a username or password are | |
supplied. | |
When using Azure Event Hubs, the username is literally | |
``$ConnectionString``, and the password is the | |
Azure Event Hub connection string. | |
""" | |
config = dict(value_serializer=lambda v: json.dumps(v).encode( | |
'utf-8'), | |
bootstrap_servers=kafka_hosts, | |
client_id="parsedmarc-{0}".format(__version__)) | |
if ssl or username or password: | |
config["security_protocol"] = "SSL" | |
config["ssl_context"] = ssl_context or create_default_context() | |
if username or password: | |
config["sasl_plain_username"] = username or "" | |
config["sasl_plain_password"] = password or "" | |
try: | |
self.producer = KafkaProducer(**config) | |
except NoBrokersAvailable: | |
raise KafkaError("No Kafka brokers available") | |
#Example 26 | |
#Project: rasa_core Author: RasaHQ File: broker.py License: Apache License 2.0 | |
def from_endpoint_config(cls, broker_config) -> Optional['KafkaProducer']: | |
if broker_config is None: | |
return None | |
return cls(broker_config.url, **broker_config.kwargs) | |
#Example 27 | |
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0 | |
def __init__(self, hosts=os.environ.get('KAFKA_HOST', 'localhost'), ports=os.environ.get('KAFKA_PORT', 9092)): | |
hostlist = hosts.split(',') | |
portlist = ports.split(',') | |
self.connection_url = [f'{host}:{port}' for host, port in zip(hostlist, portlist)] | |
self.producer = KafkaProducer(bootstrap_servers=self.connection_url) | |
#Example 28 | |
#Project: platypush Author: BlackLight File: kafka.py License: MIT License | |
def send_message(self, msg, topic, server=None, **kwargs): | |
""" | |
:param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything that implements ``__str__`` | |
:param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server will be used | |
:type server: str | |
""" | |
from kafka import KafkaProducer | |
if not server: | |
if not self.server: | |
try: | |
kafka_backend = get_backend('kafka') | |
server = kafka_backend.server | |
except: | |
raise RuntimeError('No Kafka server nor default server specified') | |
else: | |
server = self.server | |
if isinstance(msg, dict) or isinstance(msg, list): | |
msg = json.dumps(msg) | |
msg = str(msg).encode('utf-8') | |
producer = KafkaProducer(bootstrap_servers=server) | |
producer.send(topic, msg) | |
producer.flush() | |
# vim:sw=4:ts=4:et: | |
#Example 29 | |
#Project: py-timeexecution Author: kpn-digital File: kafka.py License: Apache License 2.0 | |
def producer(self): | |
""" | |
:raises: kafka.errors.NoBrokersAvailable if the connection is broken | |
""" | |
if self._producer: | |
return self._producer | |
self._producer = KafkaProducer( | |
bootstrap_servers=self.hosts, | |
value_serializer=lambda v: self._serializer_class().dumps(v).encode('utf-8'), | |
**self._kwargs | |
) | |
return self._producer | |
#Example 30 | |
#Project: stoq-plugins-public Author: PUNCH-Cyber File: kafka-queue.py License: Apache License 2.0 | |
def _connect(self) -> None: | |
""" | |
Connect to Kafka to publish a message | |
""" | |
if not self.producer: | |
self.producer = KafkaProducer( | |
bootstrap_servers=self.servers, retries=self.retries | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment