Created
July 30, 2019 09:48
-
-
Save Kamisama666/fcf3d30914080f15c50d4fb0173022d0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const async = require('async'); | |
const pg = require('pg') | |
// QueryConnect: (QuerySource, Callback) -> Connection | |
// QueryDisconnect: Connection -> Connection | |
// getSmartQuery : (QueryConnect, QueryDisconnect, QuerySource) -> SmartQuery | |
function getSmartQuery(connect, disconnect, querySource) { | |
let conn = null; | |
function connectClient(query, _connectClient) { | |
connect(querySource, function _handleConnect(error, _conn) { | |
if (!error) conn = _conn; | |
_connectClient(error, query); | |
}) | |
} | |
function runWithContext(runner, query, cb) { | |
runner(conn, query, cb); | |
} | |
// QueryRunner: (Connection, Query || [Query], Callback) -> Query || [Query] | |
// SmartQuery : (QueryRunner, Boolean, Query || [Query], Callback) -> QueryResult | |
return function smartQuery(queryRunner, shouldDisconnect, query, cb) { | |
const runQueryWithContext = async.apply(runWithContext, queryRunner); | |
const runSmartQuery = conn ? runQueryWithContext : async.seq(connectClient, runQueryWithContext); | |
runSmartQuery(query, function _queryRunner(error, result) { | |
if (shouldDisconnect) { | |
disconnect(conn); | |
conn = null; | |
} | |
cb(error, result); | |
}) | |
} | |
} | |
/** | |
This allows you to run multiple queries in an asynchronous pipeline without caring about handling the connection. | |
It stablishes the connection lazily, only once you run it for the first time. And closes it when you call it | |
with shouldDisconnect=true, after you are done with all the queries. | |
Example using the Async library and postgres: | |
**/ | |
const dbString = 'CONNECTIONSTRING'; | |
const pgPool = new pg.Pool({ connectionString: dbString }); | |
// at this point we still haven't connected | |
const smartQuery = getSmartQuery(connectToPostgres, disconnectFromPostgre, pgPool); | |
function connectToPostgres(pool, cb) { | |
pool.connect(function _handleConnect(error, client, release) { | |
cb(error, {client, release}); | |
}); | |
} | |
function disconnectFromPostgres(conn) { | |
conn.release(); | |
return conn; | |
} | |
function runSimpleQuery(conn, query, cb) { | |
conn.client.query(query, cb); | |
} | |
function runQueries(conn, queries, cb) { | |
asyncNode.eachLimit(queries, 50, conn.client.query.bind(conn.client), function runQuery(error) { | |
cb(error, queries); | |
}); | |
} | |
function getData(selectQuery, cb) { | |
// since we pass 'false', it won't disconnect after this | |
smartQuery(runSimpleQuery, false, selectQuery, cb); | |
} | |
function inserData(data, cb) { | |
const insertQueries = data.map(row => ({ | |
text: 'INSERT INSERT INTO table2 VALUES('$1', $2)', | |
values: [row.name, row.surname] | |
})); | |
// since we pass 'true', it will disconnect after running queries | |
smartQuery(runQueries, true, insertQueries, cb); | |
} | |
const syncDataFrom = asyncNode.seq( | |
getData, // the connection will be stablished before the first query | |
inserData // the connection will be closed after this | |
); | |
syncDataFrom('select name, surname from table1'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment