Created
February 7, 2022 19:53
-
-
Save jwkidd3/536449cb6b76f74ceae0a6c121b278a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kiddcorp; | |
import java.util.Properties; | |
import org.apache.kafka.clients.producer.Callback; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class ProducerDemoWithCallback { | |
public static void main(String[] args) { | |
Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class); | |
String bootstrapServers = "localhost:9092"; | |
String topic = "testTopic"; | |
//create producer properties | |
Properties properties = new Properties(); | |
properties.put("acks", "all"); | |
properties.put("retries", 0); | |
properties.put("linger.ms", 1); | |
properties.put("enable.indempotence","True"); | |
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
//create producer | |
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); | |
for(int i=0; i<10; i++) { | |
//create a producer record | |
String key = String.format("Key %d", i); | |
String message = String.format("CallBack MSG %s", key); | |
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message); | |
//send data | |
producer.send(record, new Callback() { | |
//executes every time a record is successfully sent or an exception is thrown | |
@Override | |
public void onCompletion(RecordMetadata recordMetadata, Exception e) { | |
if (e == null) { | |
System.out.printf("Received new metadata:\nTopic: %s\nPartition: " | |
+ "%s\nOffset: %s\nTimestamp: %s\n", | |
recordMetadata.topic(), recordMetadata.partition(), | |
recordMetadata.offset(), recordMetadata.timestamp()); | |
} else { | |
logger.error("Error while producing.", e); | |
} | |
} | |
}); | |
producer.flush(); | |
} | |
producer.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment