Skip to content

Instantly share code, notes, and snippets.

@twick00
Created July 14, 2020 21:36
Show Gist options
  • Save twick00/42b8d3fb2ea3bce982c0df13878aee44 to your computer and use it in GitHub Desktop.
Save twick00/42b8d3fb2ea3bce982c0df13878aee44 to your computer and use it in GitHub Desktop.
/*
* decaffeinate suggestions:
* DS102: Remove unnecessary code created because of implicit returns
* DS201: Simplify complex destructure assignments
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
import _ from "underscore";
_.mixin(require("underscore.deep"));
const async = require("async");
const genericPool = require("generic-pool");
const debug = require("debug")("thrift-pool");
const TIMEOUT_MESSAGE = "Thrift-pool: Connection timeout";
const CLOSE_MESSAGE = "Thrift-pool: Connection closed";
// create_cb creates and initializes a connection
// @param thrift, used to create connection
// @param pool_options, host and port used to create connection
// @param thrift_options, passed to thrift connection,
const create_cb = function(thrift, pool_options, thrift_options, cb) {
let connection;
cb = _.once(cb);
if (pool_options.ssl == null) { pool_options.ssl = false; }
if (pool_options.ssl) {
connection = thrift.createSSLConnection(pool_options.host, pool_options.port, thrift_options);
} else {
connection = thrift.createConnection(pool_options.host, pool_options.port, thrift_options);
}
connection.__ended = false;
if (pool_options.ttl != null) {
connection.__reap_time = Date.now() + _.random((pool_options.ttl / 2), (pool_options.ttl * 1.5));
}
connection.on("connect", function() {
debug("in connect callback");
connection.connection.setKeepAlive(true);
return cb(null, connection);
});
connection.on("error", function(err) {
debug("in error callback");
connection.__ended = true;
return cb(err);
});
connection.on("close", function() {
debug("in close callback");
connection.__ended = true;
return cb(new Error(CLOSE_MESSAGE));
});
// timeout listener only applies if timeout is passed into thrift_options
if (thrift_options.timeout != null) {
debug("adding timeout listener");
return connection.on("timeout", function() {
debug("in timeout callback");
connection.__ended = true;
return cb(new Error(TIMEOUT_MESSAGE));
});
}
};
// create_pool initializes a generic-pool
// @param thrift library to use to in create_cb
// @param pool_options, host/port are used in create_cb
// max, min, idleTimeouts are used by generic pool
// @param thrift_options used in create_cb
const create_pool = function(thrift, pool_options = {}, thrift_options = {}) {
let pool;
return pool = genericPool.Pool({
name: "thrift",
create(cb) {
return create_cb(thrift, pool_options, thrift_options, cb);
},
destroy(connection) {
debug("in destroy");
return connection.end();
},
validate(connection) {
debug("in validate");
if (connection.__ended) { return false; }
if (pool_options.ttl == null) { return true; }
return connection.__reap_time > Date.now();
},
log: pool_options.log,
max: pool_options.max_connections,
min: pool_options.min_connections,
idleTimeoutMillis: pool_options.idle_timeout
});
};
export default function(thrift, service, pool_options = {}, thrift_options = {}) {
for (let key of ["host", "port"]) { if (!pool_options[key]) { throw new Error(`Thrift-pool: You must specify ${key}`); } }
pool_options = _(pool_options).defaults({
log: false, // true/false or function
max_connections: 1, // Max number of connections to keep open at any given time
min_connections: 0, // Min number of connections to keep open at any given time
idle_timeout: 30000
}); // Time (ms) to wait until closing idle connections
const pool = create_pool(thrift, pool_options, thrift_options);
// add_listeners adds listeners for error, close, and timeout
// @param connection, connection to add listeners to
// @param cb_error, callback to attach to "error" listener
// @param cb_timeout, callback to attach to "timeout" listener
// @param cb_close, callback to attach to "close" listener
const add_listeners = function(connection, cb_error, cb_timeout, cb_close) {
connection.on("error", cb_error);
connection.on("close", cb_close);
if (thrift_options.timeout != null) {
return connection.on("timeout", cb_timeout);
}
};
// remove_listeners removes error, timeout, and close listeners with given callbacks
// @param connection, connection to remove listeners from
// @param cb_error, error callback to remove from "error" listener
// @param cb_timeout, timeout callback to remove from "timeout" listener
// @param cb_close, close callback to remove from "close" listener
const remove_listeners = function(connection, cb_error, cb_timeout, cb_close) {
connection.removeListener("error", cb_error);
connection.removeListener("close", cb_close);
if (thrift_options.timeout != null) {
return connection.removeListener("timeout", cb_timeout);
}
};
// wrap_thrift_fn when called with a function and arguments/callback:
// - acquires a connection
// - adds additional connection event listeners
// - creates a client with the acquired connection
// - calls client with fn and passed args and callback
// - connection is released before results are returned
// @return, function that takes in arguments and a callback
const wrap_thrift_fn = fn => (function(...args1) {
let adjustedLength = Math.max(args1.length, 1), args = args1.slice(0, adjustedLength - 1), cb = args1[adjustedLength - 1];
return pool.acquire(function(err, connection) {
debug("Connection acquired");
debug({err});
debug({connection});
if (err != null) { return cb(err); }
cb = _.once(cb);
const cb_error = function(err) {
debug("in error callback, post-acquire listener");
return cb(err);
};
const cb_timeout = function() {
debug("in timeout callback, post-acquire listener");
return cb(new Error(TIMEOUT_MESSAGE));
};
const cb_close = function() {
debug("in close callback, post-acquire listener");
return cb(new Error(CLOSE_MESSAGE));
};
add_listeners(connection, cb_error, cb_timeout, cb_close);
const client = thrift.createClient(service, connection);
debug("Client created");
debug({client});
return client[fn](...args, function(err, ...results) {
debug("In client callback");
remove_listeners(connection, cb_error, cb_timeout, cb_close);
pool.release(connection);
return cb(err, ...results);
});
});
});
// The following returns a new object with all of the keys of an
// initialized client class.
// Note: _.mapValues only supports "simple", "vanilla" objects that
// are not associated with a class. Since service.Client.prototype
// does not fall into that category, need to call _.clone first
return _.mapValues(_.clone(service.Client.prototype), (fn, name) => wrap_thrift_fn(name));
};
// For unit testing
_.extend(module.exports, {_private: {create_pool, TIMEOUT_MESSAGE, CLOSE_MESSAGE}});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment