|
package main |
|
|
|
import ( |
|
"container/heap" |
|
"fmt" |
|
"log" |
|
"os" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
"github.com/Shopify/sarama" |
|
cluster "github.com/bsm/sarama-cluster" |
|
"gopkg.in/crast/app.v0" |
|
) |
|
|
|
const MaxInFlight = 100 |
|
const ShutdownTime = 1 * time.Second |
|
const KB = 1024 |
|
|
|
var shutdownAll = make(chan int) |
|
|
|
func main() { |
|
groupId := os.Getenv("CONSUMER_GROUP") |
|
topics := strings.Split(os.Getenv("TOPICS"), ",") |
|
config := cluster.NewConfig() |
|
config.Consumer.Fetch.Default = 400 * KB |
|
config.Consumer.Return.Errors = true |
|
config.Consumer.Offsets.Initial = sarama.OffsetNewest |
|
client, err := cluster.NewClient(strings.Split(os.Getenv("ADDRS"), ","), config) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
app.AddCloser(func() { |
|
close(shutdownAll) |
|
client.Close() |
|
}) |
|
maxConsumers, _ := strconv.ParseInt(os.Getenv("NUM_CONSUMERS"), 10, 64) |
|
log.Printf("Going to start %d consumers", maxConsumers) |
|
for i := int64(0); i < maxConsumers; i++ { |
|
consumer, err := cluster.NewConsumerFromClient(client, groupId, topics) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
consume(i, consumer) |
|
} |
|
app.Main() |
|
} |
|
|
|
func consume(i int64, c *cluster.Consumer) { |
|
prefix := fmt.Sprintf("Consumer %d", i) |
|
rebalancer := make(chan uint8, 1) |
|
go func() { |
|
for notification := range c.Notifications() { |
|
log.Printf("%s REBALANCE NOTICE: %#v", prefix, notification) |
|
rebalancer <- 1 |
|
} |
|
}() |
|
|
|
go func() { |
|
for err := range c.Errors() { |
|
log.Printf("%s ERR: %s", prefix, err.Error()) |
|
} |
|
}() |
|
|
|
app.Go(func() { |
|
rebalancer <- 0 |
|
var inflight, commits chan int64 |
|
var topic string |
|
var partition int32 |
|
|
|
endPartition := func() { |
|
if inflight != nil { |
|
close(inflight) |
|
inflight = nil |
|
} |
|
} |
|
defer endPartition() |
|
|
|
for { |
|
select { |
|
case <-shutdownAll: |
|
return |
|
case <-rebalancer: |
|
if inflight != nil { |
|
endPartition() |
|
} |
|
case msg, ok := <-c.Messages(): |
|
if !ok { |
|
endPartition() |
|
return |
|
} |
|
// Needed due t potential rebalance timing issues |
|
if inflight == nil || msg.Topic != topic || msg.Partition != partition { |
|
endPartition() |
|
log.Printf("%s: Topic switch from %s %d to %s %d", prefix, topic, partition, msg.Topic, msg.Partition) |
|
topic, partition = msg.Topic, msg.Partition |
|
inflight = make(chan int64) |
|
commits = make(chan int64, MaxInFlight) |
|
go committer(c, inflight, commits, msg.Topic, msg.Partition) |
|
} |
|
inflight <- msg.Offset |
|
go doWork(msg, commits) |
|
} |
|
} |
|
}) |
|
} |
|
|
|
func doWork(msg *sarama.ConsumerMessage, commits chan int64) { |
|
//db.Write(msg) |
|
// simulate writing something to database that takes 10ms |
|
time.Sleep(10 * time.Millisecond) |
|
log.Printf("Finished message %d %d", msg.Partition, msg.Offset) |
|
commits <- msg.Offset |
|
} |
|
|
|
// This allows processing messages "out of order" but only committing to the latest "in order" message. |
|
// It also causes backpressure, if things get too far out of order. |
|
// |
|
// It effectively limits the concurrency of in-flight operations as well, since the "inflight" chan |
|
// stops getting read from until we get some commits back, causing things to block. |
|
func committer(c *cluster.Consumer, inflight, commits chan int64, topic string, partition int32) { |
|
var delayedAcks, inFlightAcks OffsetHeap |
|
|
|
handleOffset := func(rcvdOffset int64) bool { |
|
if len(inFlightAcks) > 0 && rcvdOffset == inFlightAcks[0] { |
|
// If our offset is the same as the last in-flight offset, then we want to ack up. |
|
toAck := rcvdOffset |
|
heap.Pop(&inFlightAcks) |
|
// Also loop through any consecutive acks we'd previously delayed so we can ack up to them. |
|
for len(delayedAcks) != 0 && len(inFlightAcks) != 0 { |
|
if v := delayedAcks[0]; v == inFlightAcks[0] { |
|
toAck = v |
|
heap.Pop(&delayedAcks) |
|
heap.Pop(&inFlightAcks) |
|
} else { |
|
break |
|
} |
|
} |
|
c.MarkPartitionOffset(topic, partition, toAck, "") |
|
return true |
|
} else { |
|
// This ack is out-of-order, push it into a min-heap, we will ack it later. |
|
heap.Push(&delayedAcks, rcvdOffset) |
|
return false |
|
} |
|
} |
|
|
|
inFlightChan := inflight |
|
|
|
// Main loop: Handle the normal operation of the ack-tracking feature. |
|
AckTrackMain: |
|
for { |
|
select { |
|
case rcvdOffset := <-commits: |
|
if handleOffset(rcvdOffset) { |
|
inFlightChan = inflight // Reset the channel back if we succeeded in acking something |
|
} |
|
case inflightOffset, ok := <-inFlightChan: |
|
if !ok { |
|
break AckTrackMain |
|
} else { |
|
heap.Push(&inFlightAcks, inflightOffset) |
|
if len(inFlightAcks) >= MaxInFlight { |
|
inFlightChan = nil |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Shutdown loop: handle the shutdown scenario. |
|
timeout := time.After(ShutdownTime) |
|
for len(inFlightAcks) != 0 { |
|
select { |
|
case rcvdOffset := <-commits: |
|
handleOffset(rcvdOffset) |
|
case <-timeout: |
|
// if we timed out, shutdown too |
|
return |
|
} |
|
} |
|
} |
|
|
|
type OffsetHeap []int64 |
|
|
|
func (h OffsetHeap) Len() int { return len(h) } |
|
func (h OffsetHeap) Less(i, j int) bool { return h[i] < h[j] } |
|
func (h OffsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
|
func (h *OffsetHeap) Push(x interface{}) { |
|
// Push and Pop use pointer receivers because they modify the slice's length, |
|
// not just its contents. |
|
*h = append(*h, x.(int64)) |
|
} |
|
|
|
func (h *OffsetHeap) Pop() interface{} { |
|
old := *h |
|
n := len(old) |
|
x := old[n-1] |
|
*h = old[0 : n-1] |
|
return x |
|
} |