Skip to content

Instantly share code, notes, and snippets.

@oleg-agapov
Last active April 14, 2020 16:10
Show Gist options
  • Save oleg-agapov/113568f68ef5bd9df10fa30b7346b902 to your computer and use it in GitHub Desktop.
Save oleg-agapov/113568f68ef5bd9df10fa30b7346b902 to your computer and use it in GitHub Desktop.
from kafka import KafkaProducer
from kafka.errors import KafkaError
import logging
logging.basicConfig(level=logging.DEBUG)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic_name = 'raw_data'
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
print("Error callback")
print(excp)
# produce asynchronously with callbacks
producer.send(topic_name, b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# # block until all async messages are sent
producer.flush()
Olegs-MacBook-Pro:1_http_gateway_python oagapov$ python http_gateway.py
DEBUG:kafka.producer.kafka:Starting the Kafka producer
DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
DEBUG:kafka.client:Initiating connection to node bootstrap-0 at localhost:9092
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.latency
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [unspecified None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [IPv6 ('::1', 9092, 0, 0)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: established TCP connection
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
DEBUG:kafka.client:Node bootstrap-0 connected
DEBUG:kafka.protocol.parser:Sending request ApiVersionRequest_v0()
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 1: ApiVersionRequest_v0()
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v0(topics=[])
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 2: MetadataRequest_v0(topics=[])
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response ApiVersionResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 1 (108.63018035888672 ms): ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=2), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=1), (api_key=31, min_version=0, max_version=1), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:kafka.protocol.parser:Received correlation id: 2
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 2 (64.17083740234375 ms): MetadataResponse_v0(brokers=[(node_id=1001, host='130.0.0.20', port=9092)], topics=[(error_code=0, topic='raw_data', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='sessions', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='valid_data', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='invalid_data', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])])])
INFO:kafka.conn:Broker version identified as 1.0.0
INFO:kafka.conn:Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
DEBUG:kafka.metrics.metrics:Added sensor with name bufferpool-wait-time
DEBUG:kafka.metrics.metrics:Added sensor with name batch-size
DEBUG:kafka.metrics.metrics:Added sensor with name compression-rate
DEBUG:kafka.metrics.metrics:Added sensor with name queue-time
DEBUG:kafka.metrics.metrics:Added sensor with name produce-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name records-per-request
DEBUG:kafka.metrics.metrics:Added sensor with name bytes
DEBUG:kafka.metrics.metrics:Added sensor with name record-retries
DEBUG:kafka.metrics.metrics:Added sensor with name errors
DEBUG:kafka.metrics.metrics:Added sensor with name record-size-max
DEBUG:kafka.producer.sender:Starting Kafka producer I/O thread.
DEBUG:kafka.producer.kafka:Kafka producer started
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=NULL) to node bootstrap-0
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=NULL)
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 3: MetadataRequest_v1(topics=NULL)
DEBUG:kafka.producer.kafka:Requesting metadata update for topic raw_data
DEBUG:kafka.protocol.parser:Received correlation id: 3
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 3 (9.958028793334961 ms): MetadataResponse_v1(brokers=[(node_id=1001, host='130.0.0.20', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='raw_data', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='sessions', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='valid_data', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='invalid_data', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 5, groups: 0)
DEBUG:kafka.producer.kafka:_wait_on_metadata woke after 0.1556079387664795 secs.
DEBUG:kafka.producer.kafka:Sending (key=None value=b'raw_bytes' headers=[]) to TopicPartition(topic='raw_data', partition=4)
DEBUG:kafka.producer.record_accumulator:Allocating a new 16384 byte message buffer for TopicPartition(topic='raw_data', partition=4)
DEBUG:kafka.producer.kafka:Waking up the sender since TopicPartition(topic='raw_data', partition=4) is either full or getting a new batch
DEBUG:kafka.producer.kafka:Flushing accumulated records in producer.
DEBUG:kafka.producer.record_accumulator:Waiting on produce to TopicPartition(topic='raw_data', partition=4)
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.client:Initiating connection to node 1001 at 130.0.0.20:9092
DEBUG:kafka.metrics.metrics:Added sensor with name node-1001.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-1001.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-1001.latency
DEBUG:kafka.conn:<BrokerConnection node_id=1001 host=130.0.0.20:9092 <disconnected> [IPv4 None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=1001 host=130.0.0.20:9092 <disconnected> [IPv4 ('130.0.0.20', 9092)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=1001 host=130.0.0.20:9092 <connecting> [IPv4 ('130.0.0.20', 9092)]>: connecting to 130.0.0.20:9092 [('130.0.0.20', 9092) IPv4]
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
DEBUG:kafka.producer.sender:Node 1001 not ready; delaying produce of accumulated batch
WARNING:kafka.producer.record_accumulator:Produced messages to topic-partition TopicPartition(topic='raw_data', partition=4) with base offset -1 and error KafkaTimeoutError: Batch for TopicPartition(topic='raw_data', partition=4) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time.
Error callback
KafkaTimeoutError: Batch for TopicPartition(topic='raw_data', partition=4) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time
WARNING:kafka.producer.record_accumulator:Expired 1 batches in accumulator
ERROR:kafka.conn:Connection attempt to <BrokerConnection node_id=1001 host=130.0.0.20:9092 <connecting> [IPv4 ('130.0.0.20', 9092)]> timed out
WARNING:kafka.producer.record_accumulator:KafkaTimeoutError: Batch for TopicPartition(topic='raw_data', partition=4) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time
INFO:kafka.conn:<BrokerConnection node_id=1001 host=130.0.0.20:9092 <connecting> [IPv4 ('130.0.0.20', 9092)]>: Closing connection. KafkaConnectionError: timeout
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
DEBUG:kafka.conn:<BrokerConnection node_id=1001 host=130.0.0.20:9092 <connecting> [IPv4 ('130.0.0.20', 9092)]>: reconnect backoff 0.056146764840526546 after 1 failures
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
WARNING:kafka.client:Node 1001 connection failed -- refreshing metadata
DEBUG:kafka.producer.kafka:The Kafka producer has closed.
DEBUG:kafka.client:Initializing connection to node 1001 for metadata request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment