Created
December 13, 2016 16:32
-
-
Save elakito/8c33dd318def85f6544a3ba799e34c36 to your computer and use it in GitHub Desktop.
sarama_cluster's publisher subscriber samples to test partition assignment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"strings" | |
"time" | |
"github.com/Shopify/sarama" | |
"github.com/bsm/sarama-cluster" | |
) | |
// A sample program to show the partiton claiming behavior against kafka 0.9 or 0.10 | |
// This sample program assumes a kafka instance is available at localhost:9092. | |
// This program either publish or subscribe or do both to topic "testTopic_n, where n | |
// is the specified random identifier suffix. The consumer part uses group Id "testGroup_n". | |
// | |
// The consumer is supposed to claim the partition when messages are published to the above topic, | |
// no matter how the order of subscription and publishing occurs. However, it seems the consumer | |
// fails to claim the partition when it subscribes to the topic first and then some messages | |
// are published. In other words, when the consumer subscribes to a previously non existent topic | |
// and the publisher publishes messages later, the consumer fails to claim the partition. When | |
// the consumer is restarted, it succeeds to claim the partition. The following steps will show | |
// this behavior | |
// 1. working case where the consumer suceeds to claim the pertition | |
// (assuming where 145 is a previously unused suffix) | |
// 1.1 publish some messages to test topic 145 | |
// [Console 1] | |
// $ go run sc_subtest.go -rnd 145 -usepub | |
// 2016/12/13 17:16:41 Creating a producer for topic testTopic_145 ... | |
// 2016/12/13 17:16:42 Message Hello 0 published to partition 0 at offset 0 | |
// 2016/12/13 17:16:42 Message Hello 1 published to partition 0 at offset 1 | |
// 2016/12/13 17:16:42 Message Hello 2 published to partition 0 at offset 2 | |
// 2016/12/13 17:16:42 Message Hello 3 published to partition 0 at offset 3 | |
// 2016/12/13 17:16:42 Message Hello 4 published to partition 0 at offset 4 | |
// 2016/12/13 17:16:42 Published: 5 | |
// $ | |
// 1.2 subscribe to test topic 145 | |
// [Console 2] | |
// $ go run sc_subtest.go -rnd 145 -usesub | |
// 2016/12/13 17:18:00 Creating a consumer for topic testTopic_145 with groupdId testGroup_145 ... | |
// 2016/12/13 17:18:00 Wait for 300 seconds before shutting down ... | |
// Starting to poll ... | |
// 2016/12/13 17:18:00 Rebalanced | |
// 2016/12/13 17:18:00 Claimed partition [testTopic_145/0] | |
// | |
// 1.3 publish more messages to test topic 145 | |
// [Console 1] | |
// $ go run sc_subtest.go -rnd 145 -usepub | |
// 2016/12/13 17:19:16 Creating a producer for topic testTopic_145 ... | |
// 2016/12/13 17:19:16 Message Hello 0 published to partition 0 at offset 5 | |
// 2016/12/13 17:19:16 Message Hello 1 published to partition 0 at offset 6 | |
// ... | |
// [Console 2] | |
// 2016/12/13 17:19:16 Consumed testTopic_145/0/5 value 'Hello 0' | |
// 2016/12/13 17:19:16 Consumed testTopic_145/0/6 value 'Hello 1' | |
// 2016/12/13 17:19:16 Consumed testTopic_145/0/7 value 'Hello 2' | |
// 2016/12/13 17:19:16 Consumed testTopic_145/0/8 value 'Hello 3' | |
// 2016/12/13 17:19:16 Consumed testTopic_145/0/9 value 'Hello 4' | |
// | |
// 2. not working case where the consumer fails to claim the pertition | |
// (assuming 146 is a previous unused suffix) | |
// 2.1 subscribe to test topic 146 | |
// $ go run sc_subtest.go -rnd 146 -usesub | |
// [Console 2] | |
// 2016/12/13 17:23:23 Creating a consumer for topic testTopic_146 with groupdId testGroup_146 ... | |
// 2016/12/13 17:23:23 Wait for 300 seconds before shutting down ... | |
// Starting to poll ... | |
// 2016/12/13 17:23:24 Rebalanced | |
// | |
// 2.2 publish some messages to test topic 146 | |
// [Console 1] | |
// $ go run sc_subtest.go -rnd 146 -usepub | |
// 2016/12/13 17:24:29 Creating a producer for topic testTopic_146 ... | |
// 2016/12/13 17:24:29 Message Hello 0 published to partition 0 at offset 0 | |
// 2016/12/13 17:24:29 Message Hello 1 published to partition 0 at offset 1 | |
// 2016/12/13 17:24:29 Message Hello 2 published to partition 0 at offset 2 | |
// 2016/12/13 17:24:29 Message Hello 3 published to partition 0 at offset 3 | |
// 2016/12/13 17:24:29 Message Hello 4 published to partition 0 at offset 4 | |
// 2016/12/13 17:24:29 Published: 5 | |
//[Console 2] | |
// | |
// 3. not working case (same as case 2) but using a single program for conveniece | |
// [Console 1] | |
// $ go run sc_subtest.go -rnd 147 -usepub -usesub | |
// 2016/12/13 17:29:34 Creating a consumer for topic testTopic_147 with groupdId testGroup_147 ... | |
// 2016/12/13 17:29:34 Creating a producer for topic testTopic_147 ... | |
// Starting to poll ... | |
// 2016/12/13 17:29:35 Rebalanced | |
// 2016/12/13 17:29:35 Message Hello 0 published to partition 0 at offset 0 | |
// 2016/12/13 17:29:35 Message Hello 1 published to partition 0 at offset 1 | |
// 2016/12/13 17:29:35 Message Hello 2 published to partition 0 at offset 2 | |
// 2016/12/13 17:29:35 Message Hello 3 published to partition 0 at offset 3 | |
// 2016/12/13 17:29:35 Message Hello 4 published to partition 0 at offset 4 | |
// 2016/12/13 17:29:35 Published: 5 | |
// 2016/12/13 17:29:35 Wait for 300 seconds before shutting down ... | |
// | |
func main() { | |
brokerStr := flag.String("brokers", "localhost:9092", "bootstrap broker addresses") | |
rnd := flag.String("rnd", "0", "random identifier to be used as suffix") | |
maxwaittime := flag.Int("max_wait_time", 1000, "max wait time") | |
usepub := flag.Bool("usepub", false, "activate the pub part") | |
usesub := flag.Bool("usesub", false, "activate the sub part") | |
flag.Parse() | |
topic := fmt.Sprintf("testTopic_%s", *rnd) | |
groupid := fmt.Sprintf("testGroup_%s", *rnd) | |
brokers := strings.Split(*brokerStr, ",") | |
config := cluster.NewConfig() | |
config.Consumer.MaxWaitTime = time.Duration(*maxwaittime) * time.Millisecond | |
config.Consumer.Return.Errors = true | |
config.Group.Return.Notifications = true | |
var consumer *cluster.Consumer | |
var producer sarama.SyncProducer | |
var err error | |
if *usesub { | |
log.Printf("Creating a consumer for topic %s with groupdId %s ...", topic, groupid) | |
consumer, err = cluster.NewConsumer(brokers, groupid, []string{topic}, config) | |
if err != nil { | |
panic(err) | |
} | |
defer func() { | |
if err := consumer.Close(); err != nil { | |
log.Printf("[ERROR] %v", err) | |
} | |
}() | |
go func() { | |
for err := range consumer.Errors() { | |
log.Printf("Error: %s", err.Error()) | |
} | |
}() | |
go func() { | |
for note := range consumer.Notifications() { | |
log.Println("Rebalanced") | |
for topic, partitions := range note.Claimed { | |
for partition := range partitions { | |
log.Printf("Claimed partition [%s/%d]", topic, partition) | |
} | |
} | |
for topic, partitions := range note.Released { | |
for partition := range partitions { | |
log.Printf("Released partition [%s/%d]", topic, partition) | |
} | |
} | |
} | |
}() | |
} | |
if *usepub { | |
log.Printf("Creating a producer for topic %s ...", topic) | |
producer, err = sarama.NewSyncProducer(brokers, nil) | |
if err != nil { | |
panic(err) | |
} | |
} | |
consumed := 0 | |
published := 0 | |
terminate := make(chan bool, 1) | |
if *usesub { | |
// start consuming messages | |
go func() { | |
fmt.Println("Starting to poll ...") | |
ConsumerLoop: | |
for { | |
select { | |
case msg := <-consumer.Messages(): | |
log.Printf("Consumed %s/%d/%d \tvalue\t '%s'\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) | |
consumed++ | |
consumer.MarkOffset(msg, "") | |
case <-terminate: | |
break ConsumerLoop | |
} | |
} | |
}() | |
} | |
if *usepub { | |
// publish some messages .... | |
for i := 0; i < 5; i++ { | |
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(fmt.Sprintf("Hello %d", i))} | |
p, o, err := producer.SendMessage(msg) | |
if err != nil { | |
log.Panicf("Ping message could not be published: %v", err) | |
} | |
published++ | |
log.Printf("Message %s published to partition %d at offset %d", msg.Value, p, o) | |
} | |
log.Printf("Published: %d\n", published) | |
} | |
if *usesub { | |
log.Println("Wait for 300 seconds before shutting down ...") | |
time.Sleep(time.Second * 300) | |
terminate <- true | |
log.Printf("Consumed: %d\n", consumed) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment