Created
September 21, 2017 20:20
-
-
Save crast/c8570e63657f4d743319230f1f87f55e to your computer and use it in GitHub Desktop.
Committer example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"container/heap" | |
"log" | |
"time" | |
"github.com/Shopify/sarama" | |
"github.com/crast/sarama-cluster" | |
) | |
const MaxInFlight = 100 | |
const ShutdownTime = 1 * time.Second | |
func main() { | |
var consumer *cluster.Consumer = nil // TODO: must initialize | |
for p := range consumer.Partitions() { | |
go consume(consumer, p) | |
} | |
} | |
func consume(c *cluster.Consumer, p cluster.PartitionConsumer) { | |
topic, partition := p.TopicPartition() | |
inflight := make(chan int64) | |
commits := make(chan int64, MaxInFlight) | |
defer close(inflight) | |
go committer(c, p, inflight, commits) | |
log.Printf("Going to consume partition %d of topic %s", partition, topic) | |
for message := range p.Messages() { | |
inflight <- message.Offset | |
// if relative ordering is important, probably we write this to another channel or | |
// fanout to channels by some hash key. | |
go doWork(message, commits) | |
} | |
} | |
func doWork(msg *sarama.ConsumerMessage, commits chan int64) { | |
// write something to database | |
//db.Write(msg) | |
commits <- msg.Offset | |
} | |
// This allows processing messages "out of order" but only committing to the latest "in order" message. | |
// It also causes backpressure, if things get too far out of order. | |
// | |
// It effectively limits the concurrency of in-flight operations as well, since the "inflight" chan | |
// stops getting read from until we get some commits back, causing things to block. | |
func committer(c *cluster.Consumer, p cluster.PartitionConsumer, inflight, commits chan int64) { | |
defer p.Close() | |
var delayedAcks, inFlightAcks OffsetHeap | |
topic, partition := p.TopicPartition() | |
handleOffset := func(rcvdOffset int64) bool { | |
if len(inFlightAcks) > 0 && rcvdOffset == inFlightAcks[0] { | |
// If our offset is the same as the last in-flight offset, then we want to ack up. | |
toAck := rcvdOffset | |
heap.Pop(&inFlightAcks) | |
// Also loop through any consecutive acks we'd previously delayed so we can ack up to them. | |
for len(delayedAcks) != 0 && len(inFlightAcks) != 0 { | |
if v := delayedAcks[0]; v == inFlightAcks[0] { | |
toAck = v | |
heap.Pop(&delayedAcks) | |
heap.Pop(&inFlightAcks) | |
} else { | |
break | |
} | |
} | |
c.MarkPartitionOffset(topic, partition, toAck, "") | |
return true | |
} else { | |
// This ack is out-of-order, push it into a min-heap, we will ack it later. | |
heap.Push(&delayedAcks, rcvdOffset) | |
return false | |
} | |
} | |
inFlightChan := inflight | |
// Main loop: Handle the normal operation of the ack-tracking feature. | |
AckTrackMain: | |
for { | |
select { | |
case rcvdOffset := <-commits: | |
if handleOffset(rcvdOffset) { | |
inFlightChan = inflight // Reset the channel back if we succeeded in acking something | |
} | |
case inflightOffset, ok := <-inFlightChan: | |
if !ok { | |
break AckTrackMain | |
} else { | |
heap.Push(&inFlightAcks, inflightOffset) | |
if len(inFlightAcks) >= MaxInFlight { | |
inFlightChan = nil | |
} | |
} | |
} | |
} | |
// Shutdown loop: handle the shutdown scenario. | |
timeout := time.After(ShutdownTime) | |
for len(inFlightAcks) != 0 { | |
select { | |
case rcvdOffset := <-p.trackChan: | |
handleOffset(rcvdOffset) | |
case <-timeout: | |
// if we timed out, shutdown too | |
return | |
} | |
} | |
} | |
type OffsetHeap []int64 | |
func (h OffsetHeap) Len() int { return len(h) } | |
func (h OffsetHeap) Less(i, j int) bool { return h[i] < h[j] } | |
func (h OffsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } | |
func (h *OffsetHeap) Push(x interface{}) { | |
// Push and Pop use pointer receivers because they modify the slice's length, | |
// not just its contents. | |
*h = append(*h, x.(int64)) | |
} | |
func (h *OffsetHeap) Pop() interface{} { | |
old := *h | |
n := len(old) | |
x := old[n-1] | |
*h = old[0 : n-1] | |
return x | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment