Code looks fine to me. There might be a hidden race condition. The p.Count++ and p.Count-- are atomic ops. For every ++ it's corresponding -- should be called TBD.
package pool
import (
"sync"
"fmt"
)
type ChanMessage struct {
f func(*sync.WaitGroup)
//wg *sync.WaitGroup
}
type Worker struct {
c chan *ChanMessage
mtx sync.Mutex
isBusy bool
}
type Pool struct {
once *sync.Once
mtx *sync.Mutex
Size int
workers []*Worker
Count int
RoundRobinCounter int
}
func (p *Pool) createWorkers() {
p.once.Do(func() {
for i := 0; i < p.Size; i++ {
var w = &Worker{
c: make(chan *ChanMessage, 1),
mtx: sync.Mutex{},
isBusy: false,
}
go func(w *Worker) {
for {
var m = <-w.c
p.Count++
var wg = &sync.WaitGroup{}
wg.Add(1)
m.f(wg)
wg.Wait()
p.Count--
}
}(w)
p.workers = append(p.workers, w)
}
})
}
func CreatePool(size int) *Pool {
var p = &Pool{
mtx: &sync.Mutex{},
Size: size,
Count: 0,
RoundRobinCounter: size + 1,
once: &sync.Once{},
}
p.createWorkers()
return p
}
func (p *Pool) Run(z func(*sync.WaitGroup)) {
p.mtx.Lock()
if p.Count >= p.Size {
p.mtx.Unlock()
// queue is full, so just create a new goroutine here
go z(nil)
return
}
var m = &ChanMessage{
f: z,
}
for i, v := range p.workers {
select {
case v.c <- m:
p.mtx.Unlock()
return
default:
continue
}
}
// couldn't find a non-busy one, so just round robin to next
p.RoundRobinCounter = (p.RoundRobinCounter + 1) % p.Size
var v = p.workers[p.RoundRobinCounter]
p.mtx.Unlock()
// I realized it will block here, so wrap in goroutine
go func() {
v.c <- m
}()
}