Last active
September 15, 2018 01:15
-
-
Save abitofhelp/b5f3eebf7e316b96b2cdc4188db2d742 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a file. It uses a reference counting pool of buffers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Package refcnt implements a reference count based []byte buffer. | |
package refcnt | |
import ( | |
"errors" | |
"fmt" | |
) | |
/*************************************************** TYPES ************************************************************/ | |
// Type Buffer maintains a []byte buffer with a reference counter. If the count is zero, the buffer can be | |
// return to the pool and reused. | |
type Buffer struct { | |
ReferenceCounter | |
Buf []byte | |
} | |
/***************************************************** VAR ************************************************************/ | |
// The buffer size if one is not provided. | |
var bufferSize = uint64(1024 << 9) | |
// Buffer pool | |
var bufferPool = NewPool( | |
func(counter ReferenceCounter) ReferenceCountable { | |
br := new(Buffer) | |
br.ReferenceCounter = counter | |
br.Buf = make([]byte, bufferSize) | |
return br | |
}, reset) | |
/*********************************************** EXPORTED METHODS *****************************************************/ | |
// Function NewBuffer creates or retrieves a Buffer. | |
func NewBuffer(bufSize uint64) *Buffer { | |
if bufSize == 0 { | |
bufferSize = bufSize | |
} | |
e := acquire() | |
return e | |
} | |
/******************************************* INTERNAL FUNCTIONS/METHODS ***********************************************/ | |
// Function to get a Buffer instance. | |
func acquire() *Buffer { | |
return bufferPool.Get().(*Buffer) | |
} | |
// Method to reset the buffer inside the Buffer object. | |
// The reference countable pool uses this method. | |
func (e *Buffer) reset() { | |
// At this time, we just reset the length to zero. | |
e.Buf = e.Buf[:0] | |
} | |
// Function to reset the Buffer object. | |
// The reference countable pool uses this method. | |
func reset(i interface{}) error { | |
obj, ok := i.(*Buffer) | |
if !ok { | |
errors.New(fmt.Sprintf("illegal object sent to resetRefCntBuffer: %v\n", i)) | |
} | |
obj.reset() | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// The implementation of the pool was inspired by the following blog: | |
// http://www.akshaydeo.com/blog/2017/12/23/How-did-I-improve-latency-by-700-percent-using-syncPool/ | |
// Package refcnt implements a reference count based []byte buffer. | |
package refcnt | |
import ( | |
"reflect" | |
"sync" | |
"sync/atomic" | |
) | |
// Interface ReferenceCountable is a reference countable interface. | |
type ReferenceCountable interface { | |
// Method to set the current instance | |
SetInstance(i interface{}) | |
// Method to increment the reference count | |
IncrementReferenceCount() | |
// Method to decrement reference count | |
DecrementReferenceCount() | |
} | |
// Type ReferenceCountedPool defines the pool. | |
type ReferenceCountedPool struct { | |
pool *sync.Pool | |
factory func() ReferenceCountable | |
returned uint32 | |
allocated uint32 | |
referenced uint32 | |
} | |
// Function NewPool creates a new reference counting pool of Buffers. | |
func NewPool(factory func(referenceCounter ReferenceCounter) ReferenceCountable, reset func(interface{}) error) *ReferenceCountedPool { | |
p := new(ReferenceCountedPool) | |
p.pool = new(sync.Pool) | |
p.pool.New = func() interface{} { | |
// Incrementing allocated count | |
atomic.AddUint32(&p.allocated, 1) | |
c := factory(ReferenceCounter{ | |
count: new(uint32), | |
destination: p.pool, | |
released: &p.returned, | |
reset: reset, | |
id: p.allocated, | |
}) | |
return c | |
} | |
return p | |
} | |
// Method to get new object | |
func (p *ReferenceCountedPool) Get() ReferenceCountable { | |
c := p.pool.Get().(ReferenceCountable) | |
c.SetInstance(c) | |
atomic.AddUint32(&p.referenced, 1) | |
c.IncrementReferenceCount() | |
return c | |
} | |
// Method to return reference counted pool stats | |
func (p *ReferenceCountedPool) Stats() map[string]interface{} { | |
return map[string]interface{}{"allocated": p.allocated, "referenced": p.referenced, "returned": p.returned} | |
} | |
// Struct representing reference | |
// This struct is supposed to be embedded inside the object to be pooled | |
// Along with that incrementing and decrementing the references is highly important specifically around routines | |
type ReferenceCounter struct { | |
count *uint32 `sql:"-" json:"-" yaml:"-"` | |
destination *sync.Pool `sql:"-" json:"-" yaml:"-"` | |
released *uint32 `sql:"-" json:"-" yaml:"-"` | |
Instance interface{} `sql:"-" json:"-" yaml:"-"` | |
reset func(interface{}) error `sql:"-" json:"-" yaml:"-"` | |
id uint32 `sql:"-" json:"-" yaml:"-"` | |
} | |
// Method to increment a reference | |
func (r ReferenceCounter) IncrementReferenceCount() { | |
atomic.AddUint32(r.count, 1) | |
} | |
// Method to decrement a reference | |
// If the reference count goes to zero, the object is put back inside the pool | |
func (r ReferenceCounter) DecrementReferenceCount() { | |
if atomic.LoadUint32(r.count) == 0 { | |
panic("this should not happen =>" + reflect.TypeOf(r.Instance).String()) | |
} | |
if atomic.AddUint32(r.count, ^uint32(0)) == 0 { | |
atomic.AddUint32(r.released, 1) | |
if err := r.reset(r.Instance); err != nil { | |
panic("error while resetting an instance => " + err.Error()) | |
} | |
r.destination.Put(r.Instance) | |
r.Instance = nil | |
} | |
} | |
// Method to set the current instance | |
func (r *ReferenceCounter) SetInstance(i interface{}) { | |
r.Instance = i | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a MIT license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Package main implements streaming of a file through a buffered channel where it is written to a new file. | |
// A data race condition arose in the for loop that reads data from the input file and sending it to the channel. | |
// I implemented a reference counting pool of buffers, to reduce the time spent allocating memory. | |
// A quick test with a 3.9GB binary file reduced the time from 5.3s to 3.4s. | |
// Package main is the entry point for the application and is responsible for configuring the environment. | |
package main | |
import ( | |
"errors" | |
"fmt" | |
"github.com/abitofhelp/junk/refcnt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"time" | |
) | |
/************************************************* CONSTANTS **********************************************************/ | |
// Const kMaxBufferSize is the maximum number of bytes that will | |
// be read from the file in each iteration through the reading loop. | |
const kMaxBufferSize = 1024 << 9 | |
// Const kMaxChannelSize is the number of []byte that can be placed | |
// into the buffered channel. | |
const kMaxChannelSize = 17 | |
// Const kFromPath is the file system path to the file that will be read. | |
//const kFromPath = "/home/mjgardner/Downloads/Shills.rar" // 56.2GB | |
//const kFromPath = "/home/mjgardner/Downloads/sit.exe" // 220MB | |
const kFromPath = "/home/mjgardner/Downloads/ABigFile.zip" // 3.9GB | |
// Const kToPath is the file system path for where the data from the buffered channel will be written. | |
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip" | |
/**********************************************************************************************************************/ | |
/**************************************************** MAIN ************************************************************/ | |
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system | |
// and channel. | |
// Parameter channelSize is the number of []byte that can be placed into the buffered channel. | |
// Parameter fromPath is the file system path to the file that will be read. | |
// Parameter toPath is the file system path to the file that will be created. | |
// Returns a configured channel, input file handle, and output file handle, and no error instance on success; Otherwise, | |
// all items will be nil, except for the error. | |
func configureInterfaces(channelSize uint64, fromPath string, toPath string) (chan *refcnt.Buffer, *os.File, *os.File, error) { | |
// Create the channel that will be used by the file reader and file write goroutines. | |
ch := make(chan *refcnt.Buffer, channelSize) | |
if ch == nil { | |
err := errors.New("failed to create the channel") | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
// Open the file at fromPath for reading... | |
ifile, err := os.Open(fromPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
// Create the output file at toPath for writing... | |
ofile, err := os.Create(toPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
return ch, ifile, ofile, err | |
} | |
// Function main is the entry point for the application and is responsible for configuring its environment. | |
func main() { | |
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed. | |
var wg sync.WaitGroup | |
// Start our timer... | |
start := time.Now() | |
// Create and configure the interfaces used by the application, such as the file system and channel. | |
ch, ifile, ofile, err := configureInterfaces(kMaxChannelSize, kFromPath, kToPath) | |
if err != nil { | |
// Error has already been logged. | |
return | |
} | |
// Automatically close the files when exiting. | |
defer ifile.Close() | |
defer ofile.Close() | |
fmt.Println("Starting processing...\n") | |
// File writing goroutine. | |
wg.Add(1) | |
go func(ch <-chan *refcnt.Buffer, of *os.File) { | |
defer wg.Done() | |
// Receive the input file's data through the buffered channel. | |
receiver(ch, of) | |
}(ch, ofile) | |
// File reading goroutine | |
wg.Add(1) | |
go func(ch chan<- *refcnt.Buffer, ifile *os.File) { | |
defer wg.Done() | |
// Write the input file's data to the buffered channel. | |
sender(ifile, ch) | |
}(ch, ifile) | |
// Wait here until all goroutines have completed their work. | |
wg.Wait() | |
// Show the duration. | |
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start)) | |
} | |
// Function sender write the input file's data to the buffered channel. | |
// Parameter if is the input file's handle. | |
// Parameter ch is the buffered channel to which data will be written. | |
func sender(ifile *os.File, ch chan<- *refcnt.Buffer) error { | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
fmt.Println("\tSender is starting to read the input file...") | |
// Loop through the input file reading chunks of data, which is sent over the channel. | |
for { | |
buf := refcnt.NewBuffer(kMaxBufferSize) | |
// Read a chunk of data from the file... | |
n, err := ifile.Read(buf.Buf[:cap(buf.Buf)]) | |
// Did we read any data from the file? Was there an error? | |
if n == 0 { | |
if err == nil { | |
// No data and no error; Keep going... | |
continue | |
} | |
if err == io.EOF { | |
// End of file, so exit the loop... | |
break | |
} | |
// Ouch! Log the error and exit. | |
log.Fatal(err) | |
return err | |
} | |
// Update the cumulative counters. | |
nChunks++ | |
nBytes += uint64(n) | |
// Send the data over the channel. | |
ch <- buf | |
} | |
// Signal the receiving goroutines that there is no more data. | |
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...") | |
close(ch) | |
// When there is no more data to process, display the sender's status. | |
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
return nil | |
} | |
// Receive the input file's data through the buffered channel. | |
// Parameter of is the output file's handle. | |
// Parameter ch is the buffered channel from which data will be read. | |
func receiver(ch <-chan *refcnt.Buffer, of *os.File) { | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...") | |
// While there is data to read in the channel, we will get it and writing it to the output file. | |
for data := range ch { | |
// Determine the length of the chunk of data that is available. | |
n := len(data.Buf) | |
// Write the chunk to the output file. | |
of.Write(data.Buf[:n]) | |
// Update the cumulative counters... | |
nBytes += uint64(n) | |
nChunks++ | |
data.DecrementReferenceCount() | |
} | |
fmt.Println("\tReceiver has emptied the channel and is exiting...") | |
// When there is no more data to process, display the receiver's status. | |
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment