Skip to content

Instantly share code, notes, and snippets.

@jeremyforan
Created December 30, 2020 02:00
Show Gist options
  • Save jeremyforan/313d21f898158dad4c2863f0a95403ba to your computer and use it in GitHub Desktop.
Save jeremyforan/313d21f898158dad4c2863f0a95403ba to your computer and use it in GitHub Desktop.
Batching with timer
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Message struct {
Content string
Id string
}
type MessageBatch struct {
mu sync.Mutex
Messages []Message
}
func (mb *MessageBatch) push(message Message){
mb.mu.Lock()
mb.Messages = append(mb.Messages, message)
mb.mu.Unlock()
}
func (mb *MessageBatch) empty(){
mb.mu.Lock()
mb.Messages = []Message{}
mb.mu.Unlock()
}
func (mb *MessageBatch) mLength() int {
mb.mu.Lock()
length := len(mb.Messages)
mb.mu.Unlock()
return length
}
func (mb *MessageBatch) String() string {
mb.mu.Lock()
defer mb.mu.Unlock()
result := fmt.Sprint(mb.Messages)
return result
}
const (
wait = time.Duration(10)*time.Second
)
func main(){
var messageBucket MessageBatch
names := []string{"sara","matt","nicole","chris","elena","bob","seven","grizzly","stu","gomez"}
filled := make(chan bool)
fmt.Println("start")
// Simulate 10 agents
for x:=0; x<9; x++ {
//use a name to easier identify threads
name := names[x]
go func(id string) {
// seed start time for thread
rand.Seed(time.Now().UnixNano())
var mess Message
for {
// give me a random letter of the alphabet
var singleRune = string(rand.Intn(26) + 97)
mess = Message{Content: singleRune, Id: id}
fmt.Print(mess, " ")
// send the message to the out bound pipe
messageBucket.push(mess)
// have thread sleep some random amount of time
time.Sleep(time.Duration(rand.Intn(15))*time.Second)
}
}(name)
}
// have go rountine check if the message buffer is filled, if so, send message to publish batch of messages
go func(fill chan bool, b *MessageBatch) {
for {
if b.mLength() > 10 {
filled <- true
}
time.Sleep(time.Duration(1 * time.Second))
}
}(filled, &messageBucket)
// this is used to show how much time is passed since the last batch publishing
var elapsed time.Duration
// timer will be used to make sure too much time doesnt pass before records are published
timer := time.NewTimer(wait)
now := time.Now()
for {
select {
// the timer has timed out
case <-timer.C:
elapsed = time.Since(now)
fmt.Printf("\n\n %fs ===== Ticked %d \n", elapsed.Seconds(), messageBucket.mLength())
fmt.Println(messageBucket)
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
messageBucket.empty()
timer.Reset(wait)
now = time.Now()
// bucket is full
case <-filled:
timer.Reset(wait)
elapsed = time.Since(now)
fmt.Printf("\n\n %fs ===== Filled %d \n", elapsed.Seconds(), messageBucket.mLength())
fmt.Println(messageBucket)
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
messageBucket.empty()
timer.Reset(wait)
now = time.Now()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment