Last active
August 29, 2015 14:04
-
-
Save Fluxx/52950530322c09267c5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/com/pinterest/secor/common/KafkaClient.java b/src/main/java/com/pinterest/secor/common/KafkaClient.java | |
index 3fe96a2..d7061dc 100644 | |
--- a/src/main/java/com/pinterest/secor/common/KafkaClient.java | |
+++ b/src/main/java/com/pinterest/secor/common/KafkaClient.java | |
@@ -131,8 +131,11 @@ public class KafkaClient { | |
ByteBuffer payload = messageAndOffset.message().payload(); | |
byte[] payloadBytes = new byte[payload.limit()]; | |
payload.get(payloadBytes); | |
+ ByteBuffer key = messageAndOffset.message().key(); | |
+ byte[] keyBytes = new byte[key.limit()]; | |
+ key.get(keyBytes); | |
return new Message(topicPartition.getTopic(), topicPartition.getPartition(), | |
- messageAndOffset.offset(), payloadBytes); | |
+ messageAndOffset.offset(), keyBytes, payloadBytes); | |
} | |
private SimpleConsumer createConsumer(TopicPartition topicPartition) { | |
diff --git a/src/main/java/com/pinterest/secor/message/Message.java b/src/main/java/com/pinterest/secor/message/Message.java | |
index c50ba94..98681ca 100644 | |
--- a/src/main/java/com/pinterest/secor/message/Message.java | |
+++ b/src/main/java/com/pinterest/secor/message/Message.java | |
@@ -29,12 +29,14 @@ public class Message { | |
private String mTopic; | |
private int mKafkaPartition; | |
private long mOffset; | |
+ private byte[] mKey; | |
private byte[] mPayload; | |
protected String fieldsToString() { | |
return "topic='" + mTopic + '\'' + | |
", kafkaPartition=" + mKafkaPartition + | |
", offset=" + mOffset + | |
+ ", key=" + mKey + | |
", payload=" + new String(mPayload); | |
} | |
@@ -44,10 +46,12 @@ public class Message { | |
return "Message{" + fieldsToString() + '}'; | |
} | |
- public Message(String topic, int kafkaPartition, long offset, byte[] payload) { | |
+ public Message(String topic, int kafkaPartition, long offset, byte[] key, | |
+ byte[] payload) { | |
mTopic = topic; | |
mKafkaPartition = kafkaPartition; | |
mOffset = offset; | |
+ mKey = key; | |
mPayload = payload; | |
} | |
@@ -64,6 +68,10 @@ public class Message { | |
return mOffset; | |
} | |
+ public byte[] getKey() { | |
+ return mKey; | |
+ } | |
+ | |
public byte[] getPayload() { | |
return mPayload; | |
} | |
diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java | |
index 6fd0b38..f1158b8 100644 | |
--- a/src/main/java/com/pinterest/secor/reader/MessageReader.java | |
+++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java | |
@@ -123,7 +123,8 @@ public class MessageReader { | |
RateLimitUtil.acquire(); | |
MessageAndMetadata<byte[], byte[]> kafkaMessage = mIterator.next(); | |
Message message = new Message(kafkaMessage.topic(), kafkaMessage.partition(), | |
- kafkaMessage.offset(), kafkaMessage.message()); | |
+ kafkaMessage.offset(), kafkaMessage.key(), | |
+ kafkaMessage.message()); | |
TopicPartition topicPartition = new TopicPartition(message.getTopic(), | |
message.getKafkaPartition()); | |
updateAccessTime(topicPartition); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment