Skip to content

Instantly share code, notes, and snippets.

@crast
Created January 27, 2018 21:05
Show Gist options
  • Save crast/61779d00db7bfaa894c70d7693cee505 to your computer and use it in GitHub Desktop.
Save crast/61779d00db7bfaa894c70d7693cee505 to your computer and use it in GitHub Desktop.
Benchmarking Reflect Channel Ops
// This is a test where I was trying to isolate how much we lost to `reflect`
// with channel operations in a tight loop.
//
// PREAMBLE
// We do a lot of processing of stream data where ordering with respect to a
// shard indicator like user ID matters. We've done this a few times where we
// implement fanout based on hashing into an slice/array of channels, and it's
// lightning fast but it ends up leaving a lot of boilerplate in your code.
//
// So I started writing a library that would do the non-critical boilerplate
// ops with 'reflect' with the assumption that the tight loop of the
// hash-and-forward operation would be suited to a little code-gen for the
// forwarding of 'your type'.
//
// But after benchmarking, I'm noticing that the reflect method is not THAT
// much slower. In a real app it would be easily many orders of magnitude less
// significant than the 'real' things happening like memory allocation,
// network, etc.
//
// ABOUT THIS TEST
// In order to test this, I've ripped out the bare minimum needed to support
// a test. I am pre-selecting the hash value to take that out of the critical
// path with the hope that we're mostly testing channel ops here.
//
// MY RESULTS
// Testing on an i7 macbook pro(2017) with 2 cores, 4 hyperthreads:
//
// $ CONSUMER_GOROUTINES=10 go test -bench . -v
// goos: darwin
// goarch: amd64
// BenchmarkFanout_Typed_Simple_depth10-4 5000000 310 ns/op
// BenchmarkFanout_Reflect_Simple_depth10-4 3000000 466 ns/op
// BenchmarkFanout_Typed_Select_depth10-4 3000000 462 ns/op
// BenchmarkFanout_Reflect_Select_depth10-4 2000000 738 ns/op
// BenchmarkFanout_Typed_Simple_depthN-4 10000000 223 ns/op
// BenchmarkFanout_Reflect_Simple_depthN-4 5000000 336 ns/op
// BenchmarkFanout_Typed_Select_depthN-4 5000000 322 ns/op
// BenchmarkFanout_Reflect_Select_depthN-4 2000000 617 ns/op
//
// Changing CONSUMER_GOROUTINES doesn't really change the outcome much on my machine.
package fanout
import (
"context"
"os"
"reflect"
"strconv"
"sync"
"testing"
)
// Following are the four implementations we want to test.
type handlerFunc func(ctx context.Context, input chan *Thing, outputs []chan *Thing)
// IMPL 1: Typed fanout, no select.
func fanoutTyped_Simple(ctx context.Context, input chan *Thing, outputs []chan *Thing) {
defer closeAll(outputs)
len64 := uint64(len(outputs))
for v := range input {
outputs[hash(v)%len64] <- v
}
}
// IMPL 2: Reflect fanout, no select.
func fanoutReflect_Simple(ctx context.Context, input chan *Thing, outputs []chan *Thing) {
defer closeAll(outputs)
reflect_Simple_Impl(reflect.ValueOf(input), reflect.ValueOf(outputs))
}
func reflect_Simple_Impl(input reflect.Value, outputs reflect.Value) {
len64 := uint64(outputs.Len())
for {
v, ok := input.Recv()
if !ok {
break
}
idx := hash(v.Interface()) % len64
outputs.Index(int(idx)).Send(v)
}
}
// IMPL 3: Typed fanout, with select. More realistic for production
// as you would use something akin to context.Context or a close channel
// to do clean teardown without having to consume through the buffer backlog.
func fanoutTyped_Select(ctx context.Context, input chan *Thing, outputs []chan *Thing) {
defer closeAll(outputs)
len64 := uint64(len(outputs))
done := ctx.Done() // probably not a big optimization, but matches the reflect version
for {
select {
case v, ok := <-input:
if !ok {
return
}
outputs[hash(v)%len64] <- v
case <-done:
return
}
}
}
// IMPL 4: Reflect, with select
func fanoutReflect_Select(ctx context.Context, input chan *Thing, outputs []chan *Thing) {
defer closeAll(outputs)
reflect_Select_Impl(ctx, reflect.ValueOf(input), reflect.ValueOf(outputs)) //TODO
}
func reflect_Select_Impl(ctx context.Context, input, outputs reflect.Value) {
const selRecv = 0
const selDone = 1
selCases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: input},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())},
}
uLen := uint64(outputs.Len())
for {
idx, recv, ok := reflect.Select(selCases)
switch idx {
case selRecv:
if !ok {
return
}
// Forward to upstream channels
hIndex := hash(recv.Interface()) % uLen
outputs.Index(int(hIndex)).Send(recv)
case selDone:
return
}
}
}
func harness(b *testing.B, n, depth int, handler handlerFunc) {
goroutines, _ := strconv.Atoi(os.Getenv("CONSUMER_GOROUTINES"))
downstreamDepth, _ := strconv.Atoi(os.Getenv("DOWNSTREAM_DEPTH"))
if goroutines == 0 {
b.Log("Make sure to set $CONSUMER_GOROUTINES to the goroutine count you want.")
b.Log("Set $DOWNSTREAM_DEPTH (default 5) to the per-consumer queue depth")
b.FailNow()
}
if downstreamDepth == 0 {
downstreamDepth = 5
}
ctx, cancel := context.WithCancel(context.Background())
input := produce(n, depth)
outputs := make([]chan *Thing, goroutines)
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
outputs[i] = make(chan *Thing, downstreamDepth)
go consumer(&wg, outputs[i])
}
// Everything should be running, but blocked until we start the fanout handler
b.ResetTimer()
go handler(ctx, input, outputs)
wg.Wait() // All consumers being finished indicates we got all values
cancel()
}
// Input channel Depth 10
func BenchmarkFanout_Typed_Simple_depth10(b *testing.B) {
harness(b, b.N, 10, fanoutTyped_Simple)
}
func BenchmarkFanout_Reflect_Simple_depth10(b *testing.B) {
harness(b, b.N, 10, fanoutReflect_Simple)
}
func BenchmarkFanout_Typed_Select_depth10(b *testing.B) {
harness(b, b.N, 10, fanoutTyped_Select)
}
func BenchmarkFanout_Reflect_Select_depth10(b *testing.B) {
harness(b, b.N, 10, fanoutReflect_Select)
}
// Input channel pre-filled
func BenchmarkFanout_Typed_Simple_depthN(b *testing.B) {
harness(b, b.N, b.N, fanoutTyped_Simple)
}
func BenchmarkFanout_Reflect_Simple_depthN(b *testing.B) {
harness(b, b.N, b.N, fanoutReflect_Simple)
}
func BenchmarkFanout_Typed_Select_depthN(b *testing.B) {
harness(b, b.N, b.N, fanoutTyped_Select)
}
func BenchmarkFanout_Reflect_Select_depthN(b *testing.B) {
harness(b, b.N, b.N, fanoutReflect_Select)
}
package fanout
import (
"encoding/binary"
"hash/fnv"
"sync"
)
type Thing struct {
Index int
Hash uint64
}
func makeThings(n int) []*Thing {
output := make([]*Thing, n)
h := fnv.New64()
for i := 0; i < n; i++ {
// We're doing the hashing in the producer because we want to isolate it from our benchmark
// if possible.
binary.Write(h, binary.BigEndian, int64(i))
output[i] = &Thing{
Index: i,
Hash: h.Sum64(),
}
}
return output
}
func produce(n int, depth int) chan *Thing {
c := make(chan *Thing, depth)
inputs := makeThings(n)
fill := func() {
for _, v := range inputs {
c <- v
}
close(c)
}
if depth >= n {
// We can send a pre-filled channel when depth is > N
// This can potentially isolate having the context switching of a separate goroutine
fill()
} else {
go fill()
}
return c
}
func consumer(wg *sync.WaitGroup, c <-chan *Thing) {
defer wg.Done()
for x := range c {
x.Index *= 2 // do some real work so compiler doesn't compile us out
}
}
func hash(v interface{}) uint64 {
return v.(*Thing).Hash // Cheating, but we want to isolate from the hash algo
}
func closeAll(channels []chan *Thing) {
for _, c := range channels {
close(c)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment