Created
August 4, 2023 15:02
-
-
Save hugowetterberg/90bab978ef141b4dfaf0104fbe66c1f1 to your computer and use it in GitHub Desktop.
Concurrent bounded processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// Example to illustrate fan-out fan-in processing with batched writes, bounded | |
// queues, and backpressure for controlling the workload that the process takes | |
// on. | |
import ( | |
"crypto/sha256" | |
"fmt" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
) | |
// Set up some bounds for our processing. | |
const ( | |
// The time it should take to read a message. | |
messageReadTime = 3 * time.Millisecond | |
// We only queue up 64 messages for processing. | |
maxWaiting = 64 | |
// In addition to those 64 messages we can have 32 messages in-flight in | |
// our workers. | |
numWorkers = 32 | |
// The workers can queue up 100 result items for batching. | |
maxQueued = 100 | |
// The batcher creates batches of up to 100 result items, bringing our | |
// the total result items we keep in memory up to 200. | |
batchSize = 100 | |
// Batches will be submitted when they are full, or after 500ms has | |
// passed since the first item was received. | |
maxBatchWait = 500 * time.Millisecond | |
// Dummy value that controls how long each batch write takes. | |
sinkLatency = 500 * time.Millisecond | |
) | |
type InputDoc struct { | |
ID int64 | |
Title string | |
Description string | |
} | |
type MessageSource struct { | |
current int64 | |
} | |
func (ms *MessageSource) ReadMessage() InputDoc { | |
ms.current++ | |
time.Sleep(messageReadTime) | |
return InputDoc{ | |
ID: ms.current, | |
Title: fmt.Sprintf("Document %d", ms.current), | |
Description: fmt.Sprintf("This is the %d:th document that we have produced.", ms.current), | |
} | |
} | |
type Result struct { | |
Hash []byte | |
} | |
type MessageSink struct { | |
Latency time.Duration | |
} | |
func (ms *MessageSink) WriteBatch(batch []Result) { | |
if len(batch) == 0 { | |
return | |
} | |
fmt.Printf("\nWriting a batch of %d\n", len(batch)) | |
time.Sleep(ms.Latency) | |
} | |
func main() { | |
// Set up a channel for receiving OS signals (ctrl-c & TERM). | |
sigs := make(chan os.Signal, 1) | |
// ...and a channel for signalling to our read loop that we should stop | |
// consuming messages. | |
stop := make(chan struct{}) | |
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | |
go func() { | |
<-sigs | |
println("\nstopping, wait for in-flight messages to finish processing") | |
close(stop) | |
}() | |
var source MessageSource | |
waitQueue := make(chan InputDoc, maxWaiting) | |
sinkQueue := make(chan Result, maxQueued) | |
// Start the read loop in a goroutine and close the wait queue when we | |
// stop reading. This will let the processing workers know that there | |
// won't be any more work coming once they have read all buffered | |
// messages. | |
go func() { | |
defer close(waitQueue) | |
readLoop(&source, stop, waitQueue) | |
}() | |
// Create a wait group that will keep track of how many of our workers | |
// still are alive and processing messages. | |
var wg sync.WaitGroup | |
wg.Add(numWorkers) | |
for i := 0; i < numWorkers; i++ { | |
go func() { | |
processingWorker(waitQueue, sinkQueue) | |
// Tell the wait group that our worker has exited. | |
wg.Done() | |
}() | |
} | |
// Once all workers have exited we close the queue that our sink reads | |
// from, so that it will exit once it has read all pending results. | |
go func() { | |
wg.Wait() | |
close(sinkQueue) | |
}() | |
sink := MessageSink{ | |
Latency: sinkLatency, | |
} | |
// Run the batch loop on our main goroutine so that our application | |
// exits after everything has been written to the sink. | |
batchLoop(sinkQueue, &sink) | |
} | |
func readLoop(source *MessageSource, stop chan struct{}, queue chan InputDoc) { | |
for { | |
// This is a potentially infinite loop. But the stop case will | |
// trigger when the stop channel is closed. | |
select { | |
case <-stop: | |
return | |
default: | |
} | |
queue <- source.ReadMessage() | |
print("+") | |
} | |
} | |
func processingWorker(input chan InputDoc, output chan Result) { | |
hash := sha256.New() | |
// Range over the input, this will consume messages until the channel is | |
// closed and we don't have any buffered messages to consume. | |
for in := range input { | |
_, _ = fmt.Fprintf(hash, "id: %d\n", in.ID) | |
_, _ = fmt.Fprintf(hash, "title: %s\n", in.Title) | |
_, _ = fmt.Fprintf(hash, "description: %s\n", in.Description) | |
output <- Result{ | |
Hash: hash.Sum(nil), | |
} | |
print("=") | |
hash.Reset() | |
} | |
} | |
// Loop that writes synchronously to our sink. If sink request serialisation was | |
// expensive, or if our sink makes good use of concurrent writes, we could | |
// extend the process with concurrent batching and writing to the sink; much | |
// like we do for the processing workers. | |
func batchLoop(input chan Result, sink *MessageSink) { | |
var ( | |
waitTimer <-chan time.Time | |
batch []Result | |
) | |
send := func() { | |
sink.WriteBatch(batch) | |
// Reset the batch slice but reuse the underlying array. | |
batch = batch[0:0] | |
// Set the wait timer to nil, read from an empty channel blocks | |
// forever. | |
waitTimer = nil | |
} | |
for { | |
// The select allows us to continue on whichever happens first | |
// of a triggered max wait timer or new item on the input | |
// channel. | |
select { | |
case <-waitTimer: | |
send() | |
case item, open := <-input: | |
if !open { | |
sink.WriteBatch(batch) | |
return | |
} | |
// Start our max wait timer if this was the first item | |
// in the batch. | |
if len(batch) == 0 { | |
waitTimer = time.After(maxBatchWait) | |
} | |
batch = append(batch, item) | |
print("-") | |
if len(batch) == batchSize { | |
send() | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment