Skip to content

Instantly share code, notes, and snippets.

Last active September 18, 2024 06:10
Show Gist options
  • Save rmela/a3bed669ad6194fb2d9670789541b0c7 to your computer and use it in GitHub Desktop.
Save rmela/a3bed669ad6194fb2d9670789541b0c7 to your computer and use it in GitHub Desktop.
SQLite query results as nodejs readable stream
echo 'create table foo ( name string, value int )' | sqlite3 foo.db
for idx in {0..1000}
echo "insert into foo( name, value ) values( $idx, 'abc${idx}' );"
done | sqlite3 foo.db
* Return SQLite3 query results as a Nodejs stream, sensitive to backpressure.
* Assumes a foo.db file with a lot of rows - I used 1000 with a number & a string
const stream = require('stream');
const sqlite = require('sqlite3');
class DBStream extends stream.Readable {
constructor( opts ) {
super( { objectMode: true } );
this.sql = opts.sql;
this.db = new sqlite.Database( opts.db );
this.stmt = this.db.prepare( this.sql );
this.on( 'end', () => this.stmt.finalize( () => this.db.close() ));
_read() {
let strm = this;
this.stmt.get( function(err,result) {
// If result is undefined, push null, which will end the stream.
* Should have no backpressure problems,
* since _read is only called when the downstream is
* ready to fetch data
err ?
strm.emit('error', err ) :
strm.push( result || null);
* simple test
new DBStream( { sql:'select * from foo' } ),
new stream.Transform( { objectMode: true, transform:( data, enc, cb ) => cb( null, JSON.stringify( data ) ) } ),
err => { err && console.error(err); process.exit(0) }
Copy link

Thank you so much! I needed to stream a result to a CSV file. This does the job 👌🏻

Copy link

Thanks a lot! I took inspiration from your snipped and created my version of it:

import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { Database, Statement } from 'sqlite'
import { Readable } from 'stream'

 * Based on:
export class SqliteReadable<T = any> extends Readable implements ReadableTyped<T> {
  constructor(private stmt: Statement) {
    super( { objectMode: true } );

    // might be unnecessary
    // this.on( 'end', () => {
    //   console.log(`SQLiteStream end`)
    //   void this.stmt.finalize()
    // })

  static async create<T = any>(db: Database, sql: string): Promise<SqliteReadable<T>> {
    const stmt = await db.prepare(sql)
    return new SqliteReadable<T>(stmt)

   * Necessary to call it, otherwise this error might occur on `db.close()`:
   * SQLITE_BUSY: unable to close due to unfinalized statements or unfinished backups
  async close(): Promise<void> {
    await this.stmt.finalize()

  // count = 0 // use for debugging

  override async _read(): Promise<void> {
    // console.log(`read ${++this.count}`) // debugging
    try {
      const r = await this.stmt.get<T>()
      this.push(r || null)
    } catch(err) {
      console.log(err) // todo: check if it's necessary
      this.emit('error', err)


Copy link

rmela commented Aug 21, 2021 via email

Copy link

wil92 commented Apr 14, 2022

good stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment