[Download Kafka] (https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz)
> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> sbt update
> sbt package
Start single node zookeeper instance
> bin/zookeeper-server-start.sh config/zookeeper.properties
Start kafka server
> bin/kafka-server-start.sh config/server.properties
Create topic
> bin/kafka-topics.sh --list --zookeeper localhost:2181
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Write messages to topic
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This message# 1
This is message #2
Reading messages from topic
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
This message# 1
This is message #2
Add dependency
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
Create a producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageProducer {
public static void main(String[] args) {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
int maxMessages = 1000;
int count = 0;
while(count < maxMessages) {
producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
System.out.println("Message send.."+count);
}
}
}