Last active
August 29, 2015 14:01
-
-
Save Kuirak/5ded9c36be65bd3c9b51 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/** | |
* Created by Jonas Kugelmann on 09.05.2014. | |
*/ | |
var stream = require('stream') | |
,util =require('util'); | |
var input1 = ['one','two','three','four','five']; | |
var input2 = ['ONE','TWO','THREE','FOUR','FIVE']; | |
util.inherits(InputStream,stream.Readable); | |
function InputStream(input){ | |
stream.Readable.call(this,{objectMode:true}); | |
this.input =input || []; | |
} | |
InputStream.prototype._read =function(){ | |
if(this.input.length <=0){ | |
return; | |
} | |
this.push(this.input.pop()); | |
}; | |
var inputStream1 = new InputStream(input1); | |
var inputStream2 = new InputStream(input2); | |
util.inherits(ConsoleStream,stream.Writable); | |
function ConsoleStream(){ | |
stream.Writable.call(this,{objectMode:true}); | |
} | |
ConsoleStream.prototype._write = function(chunk,enc,next){ | |
console.log(chunk); | |
next(); | |
}; | |
var consoleStream = new ConsoleStream(); | |
util.inherits(ConcatProcessor,stream.Transform); | |
function ConcatProcessor(){ | |
stream.Transform.call(this,{objectMode:true}); | |
} | |
ConcatProcessor.prototype._transform = function(chunk,enc,next){ | |
var result =''; | |
chunk.forEach(function(item){ | |
result += item; | |
}); | |
this.push(result); | |
next(); | |
}; | |
var concatProcessor = new ConcatProcessor(); | |
function CombineStreams(){ | |
var data=[]; | |
var args = Array.prototype.slice.call(arguments); | |
this.target =function(target){ | |
args.forEach(function(stream){ | |
//TODO check if readable stream | |
stream.on('data',function(chunk){ | |
stream.pause(); | |
data.push(chunk); | |
if(data.length ===args.length){ | |
target.write(data); | |
data =[]; | |
args.forEach(function(stream){ | |
stream.resume(); | |
}) | |
} | |
}) | |
}); | |
return target; | |
} | |
} | |
new CombineStreams(inputStream1,inputStream2).target(concatProcessor).pipe(consoleStream); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment