Created
July 21, 2014 12:03
-
-
Save bennadel/e6fc58f1526790bd4103 to your computer and use it in GitHub Desktop.
My First Look At Streams In Node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Include module references. | |
var fileSystem = require( "fs" ); | |
var stream = require( "stream" ); | |
var util = require( "util" ); | |
var chalk = require( "chalk" ); | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// I am a Transform stream (writable/readable) that takes input and finds matches to the | |
// given regular expression. As each match is found, I push each match onto the output | |
// stream individually. | |
function RegExStream( pattern ) { | |
// If this wasnt' invoked with "new", return the newable instance. | |
if ( ! ( this instanceof RegExStream ) ) { | |
return( new RegExStream( pattern ) ); | |
} | |
// Call super-constructor to set up proper options. We want to set objectMode here | |
// since each call to read() should result in a single-match, never a partial match | |
// of the given regular expression pattern. | |
stream.Transform.call( | |
this, | |
{ | |
objectMode: true | |
} | |
); | |
// Make sure the pattern is an actual instance of the RegExp object and not just a | |
// string. This way, we can treat it uniformly later on. | |
if ( ! ( pattern instanceof RegExp ) ) { | |
pattern = new RegExp( pattern, "g" ); | |
} | |
// Since the patter is passed-in by reference, we need to create a clone of it | |
// locally. We're doing to be changing the RegExp properties and we need to make | |
// sure we're not breaking encapsulation by letting the calling scope alter it. | |
this._pattern = this._clonePattern( pattern ); | |
// I hold the unprocessed portion of the input stream. | |
this._inputBuffer = ""; | |
} | |
// Extend the Transform class. | |
// -- | |
// NOTE: This only extends the class methods - not the internal properties. As such we | |
// have to make sure to call the Transform constructor (above). | |
util.inherits( RegExStream, stream.Transform ); | |
// I clone the given regular expression instance, ensuring a unique refernce that is | |
// also set to include the "g" (global) flag. | |
RegExStream.prototype._clonePattern = function( pattern ) { | |
// Split the pattern into the pattern and the flags. | |
var parts = pattern.toString().slice( 1 ).split( "/" ); | |
var regex = parts[ 0 ]; | |
var flags = ( parts[ 1 ] || "g" ); | |
// Make sure the pattern uses the global flag so our exec() will run as expected. | |
if ( flags.indexOf( "g" ) === -1 ) { | |
flags += "g"; | |
} | |
return( new RegExp( regex, flags ) ); | |
}; | |
// I finalize the internal state when the write stream has finished writing. This gives | |
// us one more opportunity to transform data and push values onto the output stream. | |
RegExStream.prototype._flush = function( flushCompleted ) { | |
logInput( "@flush - buffer:", this._inputBuffer ); | |
var match = null; | |
// Loop over any remaining matches in the internal buffer. | |
while ( ( match = this._pattern.exec( this._inputBuffer ) ) !== null ) { | |
logInput( "Push( _flush ):", match[ 0 ] ); | |
this.push( match[ 0 ] ); | |
} | |
// Clean up the internal buffer (for memory management). | |
this._inputBuffer = ""; | |
// Signal the end of the output stream. | |
this.push( null ); | |
// Signal that the input has been fully processed. | |
flushCompleted(); | |
}; | |
// I transform the given input chunk into zero or more output chunks. | |
RegExStream.prototype._transform = function( chunk, encoding, getNextChunk ) { | |
logInput( ">>> Chunk:", chunk.toString( "utf8" ) ); | |
// Add the chunk to the internal buffer. Since we might be matching values across | |
// multiple chunks, we need to build up the buffer with each unused chunk. | |
this._inputBuffer += chunk.toString( "utf8" ); | |
// Since we don't want to keep building a large internal buffer, we want to pair- | |
// down the content that we no longer need. As such, we're going to keep track of the | |
// the position of the last relevant index so that we can drop any portion of the | |
// content that will not be needed in the next chunk-processing. | |
var nextOffset = null; | |
var match = null; | |
// Loop over the matches on the buffered input. | |
while ( ( match = this._pattern.exec( this._inputBuffer ) ) !== null ) { | |
// If the current match is within the bounds (exclusive) of the input buffer, | |
// then we know we haven't matched a partial input. As such, we can safely push | |
// the match into the output. | |
if ( this._pattern.lastIndex < this._inputBuffer.length ) { | |
logInput( "Push:", match[ 0 ] ); | |
this.push( match[ 0 ] ); | |
// The next relevant offset will be after this match. | |
nextOffset = this._pattern.lastIndex; | |
// If the current match butts up against the end of the input buffer, we are in | |
// danger of an invalid match - a match that will actually span across two (or | |
// more) successive _write() actions. As such, we can't use it until the next | |
// write (or finish) event. | |
} else { | |
logInput( "Need to defer '" + match[ 0 ] + "' since its at end of the chunk." ); | |
// The next relevant offset will be BEFORE this match (since we haven't | |
// transformed it yet). | |
nextOffset = match.index; | |
} | |
} | |
// If we have successfully consumed a portion of the input, we need to reduce the | |
// current input buffer to be only the unused portion. | |
if ( nextOffset !== null ) { | |
this._inputBuffer = this._inputBuffer.slice( nextOffset ); | |
// If no match was found at all, then we can reset the internal buffer entirely. We | |
// know we won't need to be matching across chunks. | |
} else { | |
this._inputBuffer = ""; | |
} | |
// Reset the regular expression so that it can pick up at the start of the internal | |
// buffer when the next chunk is ready to be processed. | |
this._pattern.lastIndex = 0; | |
// Tell the source that we've fully processed this chunk. | |
getNextChunk(); | |
}; | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// Create an input stream from the file system. | |
var inputStream = fileSystem.createReadStream( "./input.txt" ); | |
// Create a Regular Expression stream that will run through the input and find matches | |
// for the given pattern - "words". | |
var regexStream = inputStream.pipe( new RegExStream( /\w+/i ) ); | |
// When the regex stream is ready, start reading-in word matches. | |
regexStream.on( | |
"readable", | |
function() { | |
var content = null; | |
// Since the RegExStream operates on "object mode", we know that we'll get a | |
// single match with each .read() call. | |
while ( content = this.read() ) { | |
logOutput( "Pattern match: " + content.toString( "utf8" ) ); | |
} | |
} | |
); | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// I log the given input values with a distinct color. | |
function logInput() { | |
var chalkedArguments = Array.prototype.slice.call( arguments ).map( | |
function( value ) { | |
return( chalk.magenta( value ) ); | |
} | |
); | |
console.log.apply( console, chalkedArguments ); | |
} | |
// I log the given output values with a distinct color. | |
function logOutput() { | |
var chalkedArguments = Array.prototype.slice.call( arguments ).map( | |
function( value ) { | |
return( chalk.bgMagenta.white( value ) ); | |
} | |
); | |
console.log.apply( console, chalkedArguments ); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment