Created
September 15, 2018 03:04
-
-
Save abitofhelp/40d22fd85fdd6b412b4527ba5bc54a48 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a Linux pipe.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a MIT license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Package main implements streaming of a file through a FIFO pipe, from which data is written to a new file. | |
// Using a FIFO pipe avoids having to allocate buffers within the for loop, which eliminates a data race condition | |
// on reading data from the file and sending it to the channel. However, using a pipe is slower than a | |
// channel. A quick test with a 3.9GB binary file required 5.3s with a channel, and 12.6s with a pipe. | |
// I've created a companion gist,"go-stream-file-between-goroutines-with-channel" for comparison. | |
// Package main is the entry point for the application and is responsible for configuring the environment. | |
package main | |
import ( | |
"bufio" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"time" | |
) | |
/************************************************* CONSTANTS **********************************************************/ | |
// Const kMaxBufferSize is the maximum number of bytes that will | |
// be read from the file in each iteration through the reading loop. | |
const kMaxBufferSize = 1024 << 5 | |
// Const kFromPath is the file system path to the file that will be read. | |
//const kFromPath = "/home/mjgardner/Downloads/abc.rar" // 56.2GB | |
//const kFromPath = "/home/mjgardner/Downloads/def.exe" // 220MB | |
const kFromPath = "/home/mjgardner/Downloads/ghi.zip" // 3.9GB | |
// Const kToPath is the file system path for where the data from the buffered channel will be written. | |
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip" | |
/**********************************************************************************************************************/ | |
/**************************************************** MAIN ************************************************************/ | |
// Function main is the entry point for the application and is responsible for configuring its environment. | |
func main() { | |
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed. | |
var wg sync.WaitGroup | |
// Start our timer... | |
start := time.Now() | |
// Create and configure the interfaces used by the application, such as the file system and pipe. | |
pw, pr, ifile, ofile, err := configureInterfaces(kFromPath, kToPath) | |
if err != nil { | |
// Error has already been logged. | |
return | |
} | |
// Automatically close the files when exiting. | |
defer ifile.Close() | |
defer ofile.Close() | |
fmt.Println("Starting processing...\n") | |
// File writing goroutine. | |
wg.Add(1) | |
go func(r *io.PipeReader, of *os.File) { | |
defer wg.Done() | |
// Receive the input file's data through the FIFO pipe. | |
receiver(r, of) | |
}(pr, ofile) | |
// File reading goroutine | |
wg.Add(1) | |
go func(w *io.PipeWriter, ifile *os.File) { | |
defer wg.Done() | |
// Write the input file's data to the FIFO pipe. | |
sender(ifile, w) | |
}(pw, ifile) | |
// Wait here until all goroutines have completed their work. | |
wg.Wait() | |
// Show the duration. | |
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start)) | |
} | |
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system | |
// and pipe. | |
// Parameter fromPath is the file system path to the file that will be read. | |
// Parameter toPath is the file system path to the file that will be created. | |
// Returns a configured pipe writer and reader, input file handle, and output file handle, | |
// and no error instance on success; Otherwise, | |
// all items will be nil, except for the error. | |
func configureInterfaces(fromPath string, toPath string) (*io.PipeWriter, *io.PipeReader, *os.File, *os.File, error) { | |
// Create a FIFO pipe. | |
pr, pw := io.Pipe() | |
// Open the file at fromPath for reading... | |
ifile, err := os.Open(fromPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, nil, err | |
} | |
// Create the output file at toPath for writing... | |
ofile, err := os.Create(toPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, nil, err | |
} | |
return pw, pr, ifile, ofile, err | |
} | |
// Function sender write the input file's data to the FIFO pipe. | |
// Parameter if is the input file's handle. | |
// Parameter pw pipe's writer where data from the input file will be written. | |
func sender(ifile *os.File, pw *io.PipeWriter) error { | |
defer pw.Close() | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
// The buffer for data that is read from the file. | |
buf := make([]byte, kMaxBufferSize) | |
fmt.Println("\tSender is starting to read the input file...") | |
// Loop through the input file reading chunks of data, which is sent over the pipe. | |
for { | |
// Read a chunk of data from the file... | |
n, err := ifile.Read(buf[:cap(buf)]) | |
// Did we read any data from the file? Was there an error? | |
if n == 0 { | |
if err == nil { | |
// No data and no error; Keep going... | |
continue | |
} | |
if err == io.EOF { | |
// End of file, so exit the loop... | |
break | |
} | |
// Ouch! Log the error and exit. | |
log.Fatal(err) | |
return err | |
} | |
// Update the cumulative counters. | |
nChunks++ | |
nBytes += uint64(n) | |
// Send the data over the channel. | |
pw.Write(buf[:n]) | |
} | |
// Signal the receiving goroutines that there is no more data. | |
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...") | |
pw.Close() | |
// When there is no more data to process, display the sender's status. | |
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
return nil | |
} | |
// Receive the input file's data through the FIFO pipe. | |
// Parameter of is the output file's handle. | |
// Parameter pr is the pipe's reader from which file data will be read. | |
func receiver(pr *io.PipeReader, of *os.File) error { | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
// Buffered writing to the output file. | |
writer := bufio.NewWriter(of) | |
// Buffer used for each chunk of data from the pipe. | |
buf := make([]byte, kMaxBufferSize) | |
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...") | |
// While there is data to read in the pipe, we will get it and writing it to the output file. | |
for { | |
n, err := pr.Read(buf[:cap(buf)]) | |
// Did we read any data from the file? Was there an error? | |
if n == 0 { | |
if err == nil { | |
// No data and no error; Keep going... | |
continue | |
} | |
if err == io.EOF { | |
// End of file, so exit the loop... | |
break | |
} | |
// Ouch! Log the error and exit. | |
log.Fatal(err) | |
return err | |
} | |
// Write the chunk to the output file. | |
writer.Write(buf[:n]) | |
// Update the cumulative counters... | |
nBytes += uint64(n) | |
nChunks++ | |
} | |
fmt.Println("\tReceiver has emptied the pipe and is exiting...") | |
// When there is no more data to process, display the receiver's status. | |
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment