Skip to content

Instantly share code, notes, and snippets.

@crast
Created September 21, 2017 20:20
Show Gist options
  • Save crast/c8570e63657f4d743319230f1f87f55e to your computer and use it in GitHub Desktop.
Save crast/c8570e63657f4d743319230f1f87f55e to your computer and use it in GitHub Desktop.
Committer example
package main
import (
"container/heap"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/crast/sarama-cluster"
)
const MaxInFlight = 100
const ShutdownTime = 1 * time.Second
func main() {
var consumer *cluster.Consumer = nil // TODO: must initialize
for p := range consumer.Partitions() {
go consume(consumer, p)
}
}
func consume(c *cluster.Consumer, p cluster.PartitionConsumer) {
topic, partition := p.TopicPartition()
inflight := make(chan int64)
commits := make(chan int64, MaxInFlight)
defer close(inflight)
go committer(c, p, inflight, commits)
log.Printf("Going to consume partition %d of topic %s", partition, topic)
for message := range p.Messages() {
inflight <- message.Offset
// if relative ordering is important, probably we write this to another channel or
// fanout to channels by some hash key.
go doWork(message, commits)
}
}
func doWork(msg *sarama.ConsumerMessage, commits chan int64) {
// write something to database
//db.Write(msg)
commits <- msg.Offset
}
// This allows processing messages "out of order" but only committing to the latest "in order" message.
// It also causes backpressure, if things get too far out of order.
//
// It effectively limits the concurrency of in-flight operations as well, since the "inflight" chan
// stops getting read from until we get some commits back, causing things to block.
func committer(c *cluster.Consumer, p cluster.PartitionConsumer, inflight, commits chan int64) {
defer p.Close()
var delayedAcks, inFlightAcks OffsetHeap
topic, partition := p.TopicPartition()
handleOffset := func(rcvdOffset int64) bool {
if len(inFlightAcks) > 0 && rcvdOffset == inFlightAcks[0] {
// If our offset is the same as the last in-flight offset, then we want to ack up.
toAck := rcvdOffset
heap.Pop(&inFlightAcks)
// Also loop through any consecutive acks we'd previously delayed so we can ack up to them.
for len(delayedAcks) != 0 && len(inFlightAcks) != 0 {
if v := delayedAcks[0]; v == inFlightAcks[0] {
toAck = v
heap.Pop(&delayedAcks)
heap.Pop(&inFlightAcks)
} else {
break
}
}
c.MarkPartitionOffset(topic, partition, toAck, "")
return true
} else {
// This ack is out-of-order, push it into a min-heap, we will ack it later.
heap.Push(&delayedAcks, rcvdOffset)
return false
}
}
inFlightChan := inflight
// Main loop: Handle the normal operation of the ack-tracking feature.
AckTrackMain:
for {
select {
case rcvdOffset := <-commits:
if handleOffset(rcvdOffset) {
inFlightChan = inflight // Reset the channel back if we succeeded in acking something
}
case inflightOffset, ok := <-inFlightChan:
if !ok {
break AckTrackMain
} else {
heap.Push(&inFlightAcks, inflightOffset)
if len(inFlightAcks) >= MaxInFlight {
inFlightChan = nil
}
}
}
}
// Shutdown loop: handle the shutdown scenario.
timeout := time.After(ShutdownTime)
for len(inFlightAcks) != 0 {
select {
case rcvdOffset := <-p.trackChan:
handleOffset(rcvdOffset)
case <-timeout:
// if we timed out, shutdown too
return
}
}
}
type OffsetHeap []int64
func (h OffsetHeap) Len() int { return len(h) }
func (h OffsetHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h OffsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *OffsetHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(int64))
}
func (h *OffsetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment