Last active
July 28, 2018 02:21
-
-
Save abitofhelp/0af1ac31c6a5c18987d630c82838e47d to your computer and use it in GitHub Desktop.
This gist shows the use of a buffered channel with multiple senders and receivers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a MIT license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Package main is the entry point for the application | |
// and is responsible for configuring the environment. | |
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
const ( | |
// Constant kMaxSenders is the number of goroutines that will be sending | |
// values into the channel in parallel. | |
kMaxSenders = 25 | |
// Constant kMaxReceivers is the number of goroutines that will be receiving | |
// values from the channel in parallel. | |
kMaxReceivers = 100 | |
// Constant kMaxTransmissionsPerSender is the number of items that will be | |
// sent into the channel by each sending goroutine. | |
kMaxTransmissionsPerSender = 1000 | |
) | |
// Function main is the entry point for the application and is responsible | |
// for configuring its environment. | |
func main() { | |
// Variable wg is main's WaitGroup, which detects when all of the | |
// goroutines that were launched have completed. | |
var wg sync.WaitGroup | |
// Start our timer... | |
start := time.Now() | |
// Doing something... | |
doIt(&wg) | |
// Wait here until all goroutines have completed their work. | |
wg.Wait() | |
// Show the duration. | |
fmt.Printf("Elapsed: %s", time.Since(start)) | |
} | |
// Function receiver uses a for loop over a range to receive a value from the channel. | |
// Parameter ch is a unidirectional channel for reading integer values. | |
// Parameter id is the unique identifier assigned to each receiving goroutine. | |
// Parameter counter is an atomic count of the number of messages that were received | |
// through the channel. | |
func receiver(id uint64, ch <-chan uint64, counter *uint64) { | |
for val := range ch { | |
// Increment the received counter, atomically. | |
cnt := atomic.AddUint64(counter, 1) | |
fmt.Println("(", cnt, ") Receiver(", id, "), value: ", val) | |
} | |
} | |
// Function sender places 1000 integers into a unidirectional channel for sending integer | |
// values. | |
// Parameter ch is a unidirectional channel for reading integer values. | |
// Parameter id is the unique identifier assigned to each sending goroutine. | |
// Parameter counter is an atomic count of the number of messages that were sent | |
// through the channel. | |
func sender(id uint64, ch chan<- uint64, counter *uint64) { | |
for i := uint64(0); i < kMaxTransmissionsPerSender; i++ { | |
// Generate a random value to broadcast to the receivers. | |
val := rand.Uint64() | |
// Get the current value of the counter, atomically, and show a status message. | |
cnt := atomic.LoadUint64(counter) | |
fmt.Println("(", cnt, ") Sender(", id, "), broadcasting: ", val) | |
// Send the value through the channel. | |
ch <- val | |
// Increment the sent counter, atomically. | |
atomic.AddUint64(counter, 1) | |
} | |
} | |
// Function doIt does the work. It creates the channel, | |
// launches the goroutines and returns to main(). | |
// Please note that a single sender is broadcasting values to | |
// two receivers over the channel. | |
func doIt(wg *sync.WaitGroup) { | |
// Atomic counter of the number of items sent. | |
var sentCounter uint64 = 1 | |
// Atomic counter of the number of items received. | |
var receivedCounter uint64 = 0 | |
// Variable wgs is a WaitGroup so we know when the senders have completed. | |
var wgs sync.WaitGroup | |
// Create the bidirectional channel for communications between | |
// the goroutines. | |
ch := make(chan uint64, 100) | |
// Launch multiple receivers and set up the WaitGroup to account for them. | |
for r := uint64(0); r < kMaxReceivers; r++ { | |
wg.Add(1) | |
go func(id uint64) { | |
defer wg.Done() | |
receiver(id, ch, &receivedCounter) | |
}(r) | |
} | |
// Launch multiple senders and set up the WaitGroup to account for them. | |
for s := uint64(0); s < kMaxSenders; s++ { | |
wgs.Add(1) | |
go func(id uint64) { | |
defer wgs.Done() | |
sender(id, ch, &sentCounter) | |
}(s) | |
} | |
// Wait here until all sending goroutines have completed their work. | |
wgs.Wait() | |
// Close the channel there are no more values to send... | |
// It signals the receivers that there are no more values, | |
// so when the channel is empty, they can return and the application | |
// can terminate. | |
close(ch) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment