Created
March 26, 2015 19:14
-
-
Save bjoerge/9e88a877a0633705b728 to your computer and use it in GitHub Desktop.
Parse CSV into objects with RxJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Rx = require('rx'); | |
const csv = require('csv-parse'); | |
const fs = require('fs'); | |
Rx.Node.fromReadableStream(fs.createReadStream('file.csv').pipe(csv())) | |
.skip(1) | |
.withLatestFrom(rows.take(1), (row, header) => { | |
// Map header[i] => row[i] | |
return row.reduce((rowObj, cell, i) => { | |
rowObj[header[i]] = cell; | |
return rowObj; | |
}, {}); | |
}) | |
.subscribe((row) => { | |
console.log("Row: %s", JSON.stringify(row, null, 2)); | |
}); |
@iDVB is right, but with a small change it works, and it is also more readable:
var csvFile$ = Rx.Node.fromReadableStream(fs.createReadStream('file.csv').pipe(csv()));
var header$ = csvFile$.take(1);
var rows$ = csvFile$.skip(1);
rows$.withLatestFrom(header$, (row, header) => {
return row.reduce((rowObj, cell, i) => {
rowObj[header[i]] = cell;
return rowObj;
}, {});
})
.subscribe(row => {
console.log("Row: %s", JSON.stringify(row, null, 2));
});
Rx.Node seems not be maintained anymore and is not compatible with Rxjs5. Here is a version of fromStream
Rxjs5 ready:
import Rx from 'rxjs/Rx';
const Observable = Rx.Observable;
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).publish().refCount();
}
And here is my own version of the rxjs csv parser, more concise and supporting parser options:
import Rx from 'rxjs/Rx';
import fs from 'fs';
import csv from 'csv-parse';
import fromStream from './rxjs-from-stream';
const Observable = Rx.Observable;
const parse = (file, csvParserOptions) => new Observable((observer) => {
const parser = csv(csvParserOptions);
const lines$ = fromStream(fs.createReadStream(file).pipe(parser));
lines$.subscribe(observer);
return lines$;
});
export default parse;
Example of use:
import path from 'path';
import csv from './utils/rxjs-csv';
const getLogSubscriber = (name) => ({
next(o) { console.log(`${name} next`, o); },
err(err) { console.error(`${name} error`, err); },
complete() { console.log(`${name} complete`); },
});
csv('file.csv'), { columns: true })
.subscribe({
next(o) { console.log(o); },
err(err) { console.error('error:', err); },
complete() { console.log('complete'); },
});
@QuentinRow: That's very cool and it probably deserves its own gist. Where is getLogSubscriber
used? Typo: csv('file.csv'), { columns: true })
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
rows is not defined.