Skip to content

Instantly share code, notes, and snippets.

@crast
Last active September 26, 2017 00:17
Show Gist options
  • Save crast/659d602c77e4c0b90b3cb9b8d2784ef4 to your computer and use it in GitHub Desktop.
Save crast/659d602c77e4c0b90b3cb9b8d2784ef4 to your computer and use it in GitHub Desktop.
multi-consumer-example

run the following:

export CONSUMER_GROUP=temp-1
export TOPICS=topic1,topic2
export ADDRS=host1:9092,host2:9092
export NUM_CONSUMERS=<number of concurrent Consumer to run>
go run multi-consumer-example.go

You'll find that when NUM_CONSUMERS > 1, things get really weird... tried with 30 initially, lowered to 2 to make the logs less spammy, same issue.

Only with NUM_CONSUMERS=1 does everything work

package main
import (
"container/heap"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"gopkg.in/crast/app.v0"
)
const MaxInFlight = 100
const ShutdownTime = 1 * time.Second
const KB = 1024
var shutdownAll = make(chan int)
func main() {
groupId := os.Getenv("CONSUMER_GROUP")
topics := strings.Split(os.Getenv("TOPICS"), ",")
config := cluster.NewConfig()
config.Consumer.Fetch.Default = 400 * KB
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
client, err := cluster.NewClient(strings.Split(os.Getenv("ADDRS"), ","), config)
if err != nil {
log.Fatal(err)
}
app.AddCloser(func() {
close(shutdownAll)
client.Close()
})
maxConsumers, _ := strconv.ParseInt(os.Getenv("NUM_CONSUMERS"), 10, 64)
log.Printf("Going to start %d consumers", maxConsumers)
for i := int64(0); i < maxConsumers; i++ {
consumer, err := cluster.NewConsumerFromClient(client, groupId, topics)
if err != nil {
log.Fatal(err)
}
consume(i, consumer)
}
app.Main()
}
func consume(i int64, c *cluster.Consumer) {
prefix := fmt.Sprintf("Consumer %d", i)
rebalancer := make(chan uint8, 1)
go func() {
for notification := range c.Notifications() {
log.Printf("%s REBALANCE NOTICE: %#v", prefix, notification)
rebalancer <- 1
}
}()
go func() {
for err := range c.Errors() {
log.Printf("%s ERR: %s", prefix, err.Error())
}
}()
app.Go(func() {
rebalancer <- 0
var inflight, commits chan int64
var topic string
var partition int32
endPartition := func() {
if inflight != nil {
close(inflight)
inflight = nil
}
}
defer endPartition()
for {
select {
case <-shutdownAll:
return
case <-rebalancer:
if inflight != nil {
endPartition()
}
case msg, ok := <-c.Messages():
if !ok {
endPartition()
return
}
// Needed due t potential rebalance timing issues
if inflight == nil || msg.Topic != topic || msg.Partition != partition {
endPartition()
log.Printf("%s: Topic switch from %s %d to %s %d", prefix, topic, partition, msg.Topic, msg.Partition)
topic, partition = msg.Topic, msg.Partition
inflight = make(chan int64)
commits = make(chan int64, MaxInFlight)
go committer(c, inflight, commits, msg.Topic, msg.Partition)
}
inflight <- msg.Offset
go doWork(msg, commits)
}
}
})
}
func doWork(msg *sarama.ConsumerMessage, commits chan int64) {
//db.Write(msg)
// simulate writing something to database that takes 10ms
time.Sleep(10 * time.Millisecond)
log.Printf("Finished message %d %d", msg.Partition, msg.Offset)
commits <- msg.Offset
}
// This allows processing messages "out of order" but only committing to the latest "in order" message.
// It also causes backpressure, if things get too far out of order.
//
// It effectively limits the concurrency of in-flight operations as well, since the "inflight" chan
// stops getting read from until we get some commits back, causing things to block.
func committer(c *cluster.Consumer, inflight, commits chan int64, topic string, partition int32) {
var delayedAcks, inFlightAcks OffsetHeap
handleOffset := func(rcvdOffset int64) bool {
if len(inFlightAcks) > 0 && rcvdOffset == inFlightAcks[0] {
// If our offset is the same as the last in-flight offset, then we want to ack up.
toAck := rcvdOffset
heap.Pop(&inFlightAcks)
// Also loop through any consecutive acks we'd previously delayed so we can ack up to them.
for len(delayedAcks) != 0 && len(inFlightAcks) != 0 {
if v := delayedAcks[0]; v == inFlightAcks[0] {
toAck = v
heap.Pop(&delayedAcks)
heap.Pop(&inFlightAcks)
} else {
break
}
}
c.MarkPartitionOffset(topic, partition, toAck, "")
return true
} else {
// This ack is out-of-order, push it into a min-heap, we will ack it later.
heap.Push(&delayedAcks, rcvdOffset)
return false
}
}
inFlightChan := inflight
// Main loop: Handle the normal operation of the ack-tracking feature.
AckTrackMain:
for {
select {
case rcvdOffset := <-commits:
if handleOffset(rcvdOffset) {
inFlightChan = inflight // Reset the channel back if we succeeded in acking something
}
case inflightOffset, ok := <-inFlightChan:
if !ok {
break AckTrackMain
} else {
heap.Push(&inFlightAcks, inflightOffset)
if len(inFlightAcks) >= MaxInFlight {
inFlightChan = nil
}
}
}
}
// Shutdown loop: handle the shutdown scenario.
timeout := time.After(ShutdownTime)
for len(inFlightAcks) != 0 {
select {
case rcvdOffset := <-commits:
handleOffset(rcvdOffset)
case <-timeout:
// if we timed out, shutdown too
return
}
}
}
type OffsetHeap []int64
func (h OffsetHeap) Len() int { return len(h) }
func (h OffsetHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h OffsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *OffsetHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(int64))
}
func (h *OffsetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
package main
import (
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"gopkg.in/crast/app.v0"
)
const MaxInFlight = 100
const ShutdownTime = 1 * time.Second
const KB = 1024
var shutdownAll = make(chan int)
func main() {
groupId := os.Getenv("CONSUMER_GROUP")
topics := strings.Split(os.Getenv("TOPICS"), ",")
config := cluster.NewConfig()
config.Consumer.Fetch.Default = 400 * KB
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
client, err := cluster.NewClient(strings.Split(os.Getenv("ADDRS"), ","), config)
if err != nil {
log.Fatal(err)
}
app.AddCloser(func() {
close(shutdownAll)
client.Close()
})
maxConsumers, _ := strconv.ParseInt(os.Getenv("NUM_CONSUMERS"), 10, 64)
log.Printf("Going to start %d consumers", maxConsumers)
for i := int64(0); i < maxConsumers; i++ {
consumer, err := cluster.NewConsumerFromClient(client, groupId, topics)
if err != nil {
log.Fatal(err)
}
consume(i, consumer)
}
app.Main()
}
func consume(i int64, c *cluster.Consumer) {
prefix := fmt.Sprintf("Consumer %d", i)
rebalancer := make(chan uint8, 1)
go func() {
for notification := range c.Notifications() {
log.Printf("%s REBALANCE NOTICE: %#v", prefix, notification)
rebalancer <- 1
}
}()
go func() {
for err := range c.Errors() {
log.Printf("%s ERR: %s", prefix, err.Error())
}
}()
app.Go(func() {
rebalancer <- 0
var inflight chan int64
var topic string
var partition int32
endPartition := func() {
if inflight != nil {
close(inflight)
inflight = nil
}
}
defer endPartition()
for {
select {
case <-shutdownAll:
return
case <-rebalancer:
if inflight != nil {
endPartition()
}
case msg, ok := <-c.Messages():
if !ok {
endPartition()
return
}
// Needed due t potential rebalance timing issues
if inflight == nil || msg.Topic != topic || msg.Partition != partition {
endPartition()
log.Printf("%s: Topic switch from %s %d to %s %d", prefix, topic, partition, msg.Topic, msg.Partition)
topic, partition = msg.Topic, msg.Partition
}
time.Sleep(10 * time.Millisecond)
log.Printf("Finished message %d %d", msg.Partition, msg.Offset)
c.MarkPartitionOffset(topic, partition, msg.Offset, "")
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment