Skip to content

Instantly share code, notes, and snippets.

@crast
Last active September 29, 2017 21:52
Show Gist options
  • Save crast/2f179d88d169cea7e5deadb7f11d593f to your computer and use it in GitHub Desktop.
Save crast/2f179d88d169cea7e5deadb7f11d593f to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// init (custom) config, enable errors and notifications
// set mode to ConsumerModePartitions
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Group.Mode = cluster.ConsumerModePartitions
// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for _ = range signals {
consumer.Close()
}
}()
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
// consume partitions
for part := range consumer.Partitions() {
// start a separate goroutine to consume the partition;
go func(pc cluster.PartitionConsumer) {
errChan := pc.Errors()
select {
case msg, more := <-pc.Messages():
if !more { // exit when the partition is closed
return
}
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
case err, more := <-errChan:
if more {
log.Printf("Error: %s\n", err.Error())
} else {
errChan = nil // make sure to drain messages
}
}
}(part)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment