Skip to content

Instantly share code, notes, and snippets.

@avary
Forked from mcheviron/main.go
Created March 12, 2024 08:37
Show Gist options
  • Save avary/0d8cfd99f39a1b1450e385be9efa7408 to your computer and use it in GitHub Desktop.
Save avary/0d8cfd99f39a1b1450e385be9efa7408 to your computer and use it in GitHub Desktop.
Golang array map, concurrency problem
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func Map[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 {
arrT2 := make([]T2, len(arr))
for i, t := range arr {
t2 := f(t, i)
arrT2[i] = t2
}
return arrT2
}
func MapConcurrent[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 {
var wg sync.WaitGroup
wg.Add(len(arr))
arrT2 := make([]T2, len(arr))
for i, t := range arr {
go func() {
t2 := f(t, i)
arrT2[i] = t2
wg.Done()
}()
}
wg.Wait()
return arrT2
}
func MapConcurrentWorkerPool[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 {
arrT2 := make([]T2, len(arr))
numworkers := runtime.NumCPU()
var wg sync.WaitGroup
wg.Add(numworkers)
worker := func(startIndex, endIndex int) {
for i := startIndex; i < endIndex; i++ {
t2 := f(arr[i], i)
arrT2[i] = t2
}
wg.Done()
}
chunkSize := len(arr) / numworkers
for i := 0; i < numworkers; i++ {
startIndex := i * chunkSize
endIndex := (i + 1) * chunkSize
if i == numworkers-1 {
endIndex = len(arr)
}
go worker(startIndex, endIndex)
}
wg.Wait()
return arrT2
}
type Number struct {
a int
}
func main() {
const n = 10000000
arr := make([]Number, 0, n)
for i := range n {
arr = append(arr, Number{i})
}
returnInt := func(t Number, index int) int {
return t.a
}
func() {
defer timer("normal map")()
Map(arr, returnInt)
}()
func() {
defer timer("infinite goroutines map")()
MapConcurrent(arr, returnInt)
}()
func() {
defer timer("worker pool map")()
MapConcurrentWorkerPool(arr, returnInt)
}()
}
func timer(name string) func() {
start := time.Now()
return func() {
fmt.Printf("%s took %v\n", name, time.Since(start))
}
}
func BenchmarkMap(b *testing.B) {
n := 10000000
input := make([]Number, 0, n)
for i := 0; i < n; i++ {
input = append(input, Number{i})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = Map(input, func(item Number, index int) Number {
return Number{item.a * 2}
})
}
}
func BenchmarkMapConcurrentWorkerPool(b *testing.B) {
n := 10000000
input := make([]Number, 0, n)
for i := 0; i < n; i++ {
input = append(input, Number{i})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = MapConcurrentWorkerPool(input, func(item Number, index int) Number {
return Number{item.a * 2}
})
}
}
func BenchmarkMapConcurrent(b *testing.B) {
n := 10000000
input := make([]Number, 0, n)
for i := 0; i < n; i++ {
input = append(input, Number{i})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = MapConcurrent(input, func(item Number, index int) Number {
return Number{a: item.a * 2}
})
}
}
// Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMap|BenchmarkMapConcurrentWorkerPool|BenchmarkMapConcurrent)$ go-perf
// goos: darwin
// goarch: arm64
// pkg: go-perf
// BenchmarkMap-8 86 13761745 ns/op 80003094 B/op 1 allocs/op
// BenchmarkMapConcurrentWorkerPool-8 1 2435593875 ns/op 80013424 B/op 32 allocs/op
// BenchmarkMapConcurrent-8 1 1676149917 ns/op 880347800 B/op 10000741 allocs/op
// PASS
// ok go-perf7.178s
// Total: 87.45s
// ROUTINE ======================== go-perf.BenchmarkMapConcurrentWorkerPool in /Users/mostafaelataby-cheviron/code/go-perf/main_test.go
// 0 630ms (flat, cum) 0.72% of Total
// . . 21:func BenchmarkMapConcurrentWorkerPool(b *testing.B) {
// . . 22: n := 10000000
// . . 23: input := make([]Number, 0, n)
// . . 24: for i := 0; i < n; i++ {
// . . 25: input = append(input, Number{i})
// . . 26: }
// . . 27:
// . . 28: b.ResetTimer()
// . . 29: for i := 0; i < b.N; i++ {
// . 630ms 30: _ = MapConcurrentWorkerPool(input, func(item Number, index int) Number {
// . . 31: return Number{item.a * 2}
// . . 32: })
// . . 33: }
// . . 34:}
// . . 35:
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }] in /Users/mostafaelataby-cheviron/code/go-perf/main.go
// 0 630ms (flat, cum) 0.72% of Total
// . . 40:func MapConcurrentWorkerPool[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 {
// . . 41: arrT2 := make([]T2, len(arr))
// . . 42:
// . . 43: numworkers := runtime.NumCPU()
// . . 44:
// . . 45: type indexT1 struct {
// . . 46: index int
// . . 47: t1 T1
// . . 48: }
// . . 49:
// . . 50: type indexT2 struct {
// . . 51: index int
// . . 52: t2 T2
// . . 53: }
// . . 54:
// . . 55: inputs := make(chan indexT1, numworkers)
// . . 56: results := make(chan indexT2, numworkers)
// . . 57:
// . . 58: var wg sync.WaitGroup
// . . 59: wg.Add(numworkers)
// . . 60:
// . . 61: worker := func() {
// . . 62: for t1 := range inputs {
// . . 63: t2 := f(t1.t1, t1.index)
// . . 64: results <- indexT2{t1.index, t2}
// . . 65: }
// . . 66:
// . . 67: wg.Done()
// . . 68: }
// . . 69:
// . . 70: for range numworkers {
// . . 71: go worker()
// . . 72: }
// . . 73:
// . . 74: go func() {
// . . 75: wg.Wait()
// . . 76: close(results)
// . . 77: }()
// . . 78:
// . . 79: go func() {
// . . 80: for i, t := range arr {
// . . 81: inputs <- indexT1{i, t}
// . . 82: }
// . . 83: close(inputs)
// . . 84: }()
// . . 85:
// . 630ms 86: for t2 := range results {
// . . 87: arrT2[t2.index] = t2.t2
// . . 88: }
// . . 89:
// . . 90: return arrT2
// . . 91:}
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }].func1 in /Users/mostafaelataby-cheviron/code/go-perf/main.go
// 0 17.52s (flat, cum) 20.03% of Total
// . . 61: worker := func() {
// . 10.15s 62: for t1 := range inputs {
// . . 63: t2 := f(t1.t1, t1.index)
// . 7.37s 64: results <- indexT2{t1.index, t2}
// . . 65: }
// . . 66:
// . . 67: wg.Done()
// . . 68: }
// . . 69:
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }].func3 in /Users/mostafaelataby-cheviron/code/go-perf/main.go
// 0 1.04s (flat, cum) 1.19% of Total
// . . 79: go func() {
// . . 80: for i, t := range arr {
// . 1.04s 81: inputs <- indexT1{i, t}
// . . 82: }
// . . 83: close(inputs)
// . . 84: }()
// . . 85:
// . . 86: for t2 := range results {
// ROUTINE ======================== go-perf.MapConcurrent[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }] in /Users/mostafaelataby-cheviron/code/go-perf/main.go
// 0 200ms (flat, cum) 0.23% of Total
// . . 21:func MapConcurrent[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 {
// . . 22: var wg sync.WaitGroup
// . . 23: wg.Add(len(arr))
// . . 24:
// . . 25: arrT2 := make([]T2, len(arr))
// . . 26:
// . . 27: for i, t := range arr {
// . 200ms 28: go func() {
// . . 29: t2 := f(t, i)
// . . 30: arrT2[i] = t2
// . . 31:
// . . 32: wg.Done()
// . . 33: }()
// Updating the worker pool to not use channels but shards of the slice
// Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMap|BenchmarkMapConcurrentWorkerPool|BenchmarkMapConcurrent)$ go-perf
// goos: darwin
// goarch: arm64
// pkg: go-perf
// BenchmarkMap-8 68 16442178 ns/op 80003086 B/op 1 allocs/op
// BenchmarkMapConcurrentWorkerPool-8 213 5634531 ns/op 80003768 B/op 11 allocs/op
// BenchmarkMapConcurrent-8 1 1601082708 ns/op 881047088 B/op 10002197 allocs/op
// PASS
// ok go-perf 5.310s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment