Skip to content

Instantly share code, notes, and snippets.

@bjwschaap
Created June 16, 2020 06:10
Show Gist options
  • Save bjwschaap/77b087ef7b083ed4d13f8aa972e3db59 to your computer and use it in GitHub Desktop.
Save bjwschaap/77b087ef7b083ed4d13f8aa972e3db59 to your computer and use it in GitHub Desktop.
Updater/worker pool idea
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type work struct {
controllers []string
payload []byte
}
type handler struct {
result chan bool
id string
payload string
}
var wchan = make(chan work, 5)
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
go updater(ctx, 1, wchan)
go updater(ctx, 2, wchan)
wchan <- work{
controllers: []string{"1234", "5678", "2eewe233"},
payload: []byte("content"),
}
wchan <- work{
controllers: []string{"sdaddsd", "dddd8d87d"},
payload: []byte("more content..."),
}
wchan <- work{
controllers: []string{"cpdisa9sa-09", "a9d0a98ad"},
payload: []byte("finally"),
}
time.Sleep(10 * time.Second)
cancel()
time.Sleep(time.Second)
}
func updater(ctx context.Context, id int, w <-chan work) {
log.WithField("id", id).Info("updater routine starting")
for {
select {
case wt := <-w:
var wg sync.WaitGroup
log.WithFields(log.Fields{
"pending": len(w),
"id": id,
}).Infof("received job")
log.WithFields(log.Fields{
"id": id,
"workers": len(wt.controllers),
}).Info("spawning workers")
for _, u := range wt.controllers {
wg.Add(1)
go worker(u, string(wt.payload), &wg, 4*time.Second)
}
log.Info("waiting until workers are done")
wg.Wait()
log.Info("done.")
case <-ctx.Done():
log.Info("stopping updater")
return
}
}
}
func worker(id string, payload string, wg *sync.WaitGroup, timeout time.Duration) {
log.WithField("controller_id", id).Infof("worker started")
defer wg.Done()
// defer client.Unsubscribe
h := handler{
id: id,
payload: payload,
result: make(chan bool, 1),
}
// subscribe (client.Subscribe)
go h.handle()
// wait for confirmation
for {
select {
case r := <-h.result:
log.WithField("result", r).Infof("worker for %s done", id)
return
case <-time.After(timeout):
log.WithField("after", timeout).Errorf("timeout waiting for %s", id)
return
}
}
}
func (h *handler) handle() {
rand.Seed(time.Now().UnixNano())
d, _ := time.ParseDuration(fmt.Sprintf("%ds", rand.Intn(5)))
time.Sleep(d)
log.WithField("payload", h.payload).Infof("sending payload to %s", h.id)
h.result <- true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment