Created
July 14, 2020 21:36
-
-
Save twick00/42b8d3fb2ea3bce982c0df13878aee44 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* decaffeinate suggestions: | |
* DS102: Remove unnecessary code created because of implicit returns | |
* DS201: Simplify complex destructure assignments | |
* DS207: Consider shorter variations of null checks | |
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md | |
*/ | |
import _ from "underscore"; | |
_.mixin(require("underscore.deep")); | |
const async = require("async"); | |
const genericPool = require("generic-pool"); | |
const debug = require("debug")("thrift-pool"); | |
const TIMEOUT_MESSAGE = "Thrift-pool: Connection timeout"; | |
const CLOSE_MESSAGE = "Thrift-pool: Connection closed"; | |
// create_cb creates and initializes a connection | |
// @param thrift, used to create connection | |
// @param pool_options, host and port used to create connection | |
// @param thrift_options, passed to thrift connection, | |
const create_cb = function(thrift, pool_options, thrift_options, cb) { | |
let connection; | |
cb = _.once(cb); | |
if (pool_options.ssl == null) { pool_options.ssl = false; } | |
if (pool_options.ssl) { | |
connection = thrift.createSSLConnection(pool_options.host, pool_options.port, thrift_options); | |
} else { | |
connection = thrift.createConnection(pool_options.host, pool_options.port, thrift_options); | |
} | |
connection.__ended = false; | |
if (pool_options.ttl != null) { | |
connection.__reap_time = Date.now() + _.random((pool_options.ttl / 2), (pool_options.ttl * 1.5)); | |
} | |
connection.on("connect", function() { | |
debug("in connect callback"); | |
connection.connection.setKeepAlive(true); | |
return cb(null, connection); | |
}); | |
connection.on("error", function(err) { | |
debug("in error callback"); | |
connection.__ended = true; | |
return cb(err); | |
}); | |
connection.on("close", function() { | |
debug("in close callback"); | |
connection.__ended = true; | |
return cb(new Error(CLOSE_MESSAGE)); | |
}); | |
// timeout listener only applies if timeout is passed into thrift_options | |
if (thrift_options.timeout != null) { | |
debug("adding timeout listener"); | |
return connection.on("timeout", function() { | |
debug("in timeout callback"); | |
connection.__ended = true; | |
return cb(new Error(TIMEOUT_MESSAGE)); | |
}); | |
} | |
}; | |
// create_pool initializes a generic-pool | |
// @param thrift library to use to in create_cb | |
// @param pool_options, host/port are used in create_cb | |
// max, min, idleTimeouts are used by generic pool | |
// @param thrift_options used in create_cb | |
const create_pool = function(thrift, pool_options = {}, thrift_options = {}) { | |
let pool; | |
return pool = genericPool.Pool({ | |
name: "thrift", | |
create(cb) { | |
return create_cb(thrift, pool_options, thrift_options, cb); | |
}, | |
destroy(connection) { | |
debug("in destroy"); | |
return connection.end(); | |
}, | |
validate(connection) { | |
debug("in validate"); | |
if (connection.__ended) { return false; } | |
if (pool_options.ttl == null) { return true; } | |
return connection.__reap_time > Date.now(); | |
}, | |
log: pool_options.log, | |
max: pool_options.max_connections, | |
min: pool_options.min_connections, | |
idleTimeoutMillis: pool_options.idle_timeout | |
}); | |
}; | |
export default function(thrift, service, pool_options = {}, thrift_options = {}) { | |
for (let key of ["host", "port"]) { if (!pool_options[key]) { throw new Error(`Thrift-pool: You must specify ${key}`); } } | |
pool_options = _(pool_options).defaults({ | |
log: false, // true/false or function | |
max_connections: 1, // Max number of connections to keep open at any given time | |
min_connections: 0, // Min number of connections to keep open at any given time | |
idle_timeout: 30000 | |
}); // Time (ms) to wait until closing idle connections | |
const pool = create_pool(thrift, pool_options, thrift_options); | |
// add_listeners adds listeners for error, close, and timeout | |
// @param connection, connection to add listeners to | |
// @param cb_error, callback to attach to "error" listener | |
// @param cb_timeout, callback to attach to "timeout" listener | |
// @param cb_close, callback to attach to "close" listener | |
const add_listeners = function(connection, cb_error, cb_timeout, cb_close) { | |
connection.on("error", cb_error); | |
connection.on("close", cb_close); | |
if (thrift_options.timeout != null) { | |
return connection.on("timeout", cb_timeout); | |
} | |
}; | |
// remove_listeners removes error, timeout, and close listeners with given callbacks | |
// @param connection, connection to remove listeners from | |
// @param cb_error, error callback to remove from "error" listener | |
// @param cb_timeout, timeout callback to remove from "timeout" listener | |
// @param cb_close, close callback to remove from "close" listener | |
const remove_listeners = function(connection, cb_error, cb_timeout, cb_close) { | |
connection.removeListener("error", cb_error); | |
connection.removeListener("close", cb_close); | |
if (thrift_options.timeout != null) { | |
return connection.removeListener("timeout", cb_timeout); | |
} | |
}; | |
// wrap_thrift_fn when called with a function and arguments/callback: | |
// - acquires a connection | |
// - adds additional connection event listeners | |
// - creates a client with the acquired connection | |
// - calls client with fn and passed args and callback | |
// - connection is released before results are returned | |
// @return, function that takes in arguments and a callback | |
const wrap_thrift_fn = fn => (function(...args1) { | |
let adjustedLength = Math.max(args1.length, 1), args = args1.slice(0, adjustedLength - 1), cb = args1[adjustedLength - 1]; | |
return pool.acquire(function(err, connection) { | |
debug("Connection acquired"); | |
debug({err}); | |
debug({connection}); | |
if (err != null) { return cb(err); } | |
cb = _.once(cb); | |
const cb_error = function(err) { | |
debug("in error callback, post-acquire listener"); | |
return cb(err); | |
}; | |
const cb_timeout = function() { | |
debug("in timeout callback, post-acquire listener"); | |
return cb(new Error(TIMEOUT_MESSAGE)); | |
}; | |
const cb_close = function() { | |
debug("in close callback, post-acquire listener"); | |
return cb(new Error(CLOSE_MESSAGE)); | |
}; | |
add_listeners(connection, cb_error, cb_timeout, cb_close); | |
const client = thrift.createClient(service, connection); | |
debug("Client created"); | |
debug({client}); | |
return client[fn](...args, function(err, ...results) { | |
debug("In client callback"); | |
remove_listeners(connection, cb_error, cb_timeout, cb_close); | |
pool.release(connection); | |
return cb(err, ...results); | |
}); | |
}); | |
}); | |
// The following returns a new object with all of the keys of an | |
// initialized client class. | |
// Note: _.mapValues only supports "simple", "vanilla" objects that | |
// are not associated with a class. Since service.Client.prototype | |
// does not fall into that category, need to call _.clone first | |
return _.mapValues(_.clone(service.Client.prototype), (fn, name) => wrap_thrift_fn(name)); | |
}; | |
// For unit testing | |
_.extend(module.exports, {_private: {create_pool, TIMEOUT_MESSAGE, CLOSE_MESSAGE}}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment