Created
December 30, 2020 02:00
-
-
Save jeremyforan/313d21f898158dad4c2863f0a95403ba to your computer and use it in GitHub Desktop.
Batching with timer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"sync" | |
"time" | |
) | |
type Message struct { | |
Content string | |
Id string | |
} | |
type MessageBatch struct { | |
mu sync.Mutex | |
Messages []Message | |
} | |
func (mb *MessageBatch) push(message Message){ | |
mb.mu.Lock() | |
mb.Messages = append(mb.Messages, message) | |
mb.mu.Unlock() | |
} | |
func (mb *MessageBatch) empty(){ | |
mb.mu.Lock() | |
mb.Messages = []Message{} | |
mb.mu.Unlock() | |
} | |
func (mb *MessageBatch) mLength() int { | |
mb.mu.Lock() | |
length := len(mb.Messages) | |
mb.mu.Unlock() | |
return length | |
} | |
func (mb *MessageBatch) String() string { | |
mb.mu.Lock() | |
defer mb.mu.Unlock() | |
result := fmt.Sprint(mb.Messages) | |
return result | |
} | |
const ( | |
wait = time.Duration(10)*time.Second | |
) | |
func main(){ | |
var messageBucket MessageBatch | |
names := []string{"sara","matt","nicole","chris","elena","bob","seven","grizzly","stu","gomez"} | |
filled := make(chan bool) | |
fmt.Println("start") | |
// Simulate 10 agents | |
for x:=0; x<9; x++ { | |
//use a name to easier identify threads | |
name := names[x] | |
go func(id string) { | |
// seed start time for thread | |
rand.Seed(time.Now().UnixNano()) | |
var mess Message | |
for { | |
// give me a random letter of the alphabet | |
var singleRune = string(rand.Intn(26) + 97) | |
mess = Message{Content: singleRune, Id: id} | |
fmt.Print(mess, " ") | |
// send the message to the out bound pipe | |
messageBucket.push(mess) | |
// have thread sleep some random amount of time | |
time.Sleep(time.Duration(rand.Intn(15))*time.Second) | |
} | |
}(name) | |
} | |
// have go rountine check if the message buffer is filled, if so, send message to publish batch of messages | |
go func(fill chan bool, b *MessageBatch) { | |
for { | |
if b.mLength() > 10 { | |
filled <- true | |
} | |
time.Sleep(time.Duration(1 * time.Second)) | |
} | |
}(filled, &messageBucket) | |
// this is used to show how much time is passed since the last batch publishing | |
var elapsed time.Duration | |
// timer will be used to make sure too much time doesnt pass before records are published | |
timer := time.NewTimer(wait) | |
now := time.Now() | |
for { | |
select { | |
// the timer has timed out | |
case <-timer.C: | |
elapsed = time.Since(now) | |
fmt.Printf("\n\n %fs ===== Ticked %d \n", elapsed.Seconds(), messageBucket.mLength()) | |
fmt.Println(messageBucket) | |
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
messageBucket.empty() | |
timer.Reset(wait) | |
now = time.Now() | |
// bucket is full | |
case <-filled: | |
timer.Reset(wait) | |
elapsed = time.Since(now) | |
fmt.Printf("\n\n %fs ===== Filled %d \n", elapsed.Seconds(), messageBucket.mLength()) | |
fmt.Println(messageBucket) | |
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
messageBucket.empty() | |
timer.Reset(wait) | |
now = time.Now() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment