Created
June 7, 2018 23:33
-
-
Save mryellow/2d58a618b24714daaeec4ecbebe3ac71 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Tx = require('ethereumjs-tx'); | |
const asyncPriorityQueue = require('async').priorityQueue; | |
let web3; | |
let mutexLock = false; | |
const signTask = task => { | |
const tx = new Tx(task.txOpt); | |
tx.sign(Buffer.from(task.privateKey.replace(/^0x/, ''), 'hex')); | |
const valid = tx.validate(); | |
const verify = tx.verifySignature(); | |
const from = tx.getSenderAddress(); | |
console.assert(valid, 'Transaction invalid'); | |
console.assert(verify, 'Signature invalid'); | |
console.assert(from, 'Sender invalid'); | |
//from = '0x' + from.toString('hex'); | |
//console.assert(from === task.account, 'Sender mismatch'); | |
return '0x' + tx.serialize().toString('hex'); | |
}; | |
// `hashCallback`, `receiptCallback` | |
const sendSignedTask = (signedTx, throttle) => { | |
if (!signedTx) return Promise.reject(new Error('Missing signedTx')); | |
return new Promise((resolve, reject) => { | |
let txHash; | |
web3.eth | |
.sendSignedTransaction(signedTx) | |
.once('transactionHash', hash => { | |
// Once we have hash, release thread to send next nonce. | |
mutexLock = false; | |
// Save the hash in-case we timeout and need to poll for it. | |
txHash = hash; | |
// TODO: Separate from `callback` another `hashCallback`? | |
// TODO: Emit an event or callback to tell outside we've recieved hash? | |
// TODO: Return the promievent? | |
// Wait for throttle instead of waiting for receipt. | |
if (throttle > 0) { | |
setTimeout(() => { | |
resolve({ transactionHash: txHash }); | |
}, throttle); | |
} | |
}) | |
.once('receipt', receipt => { | |
resolve(receipt); | |
}) | |
.catch(err => { | |
console.error(err.message); | |
// FIXME: How do we know this error clears the mutex and isn't after it was already cleared and then locked again? | |
mutexLock = false; | |
// Continue waiting | |
if ( | |
!throttle && | |
err.message | |
.toLowerCase() | |
.indexOf('transaction was not mined within') !== -1 | |
) { | |
let cnt = 0; | |
// TODO: Back-off, or timeout? | |
const _checkReceipt = async hash => { | |
const receipt = await web3.eth.getTransactionReceipt(hash); | |
// Log periodically | |
if (cnt % 250 === 0) console.log(hash, receipt, cnt); | |
cnt++; | |
if (receipt && receipt.blockNumber > 0) { | |
resolve(receipt); | |
} else { | |
// Recurse | |
setTimeout(() => _checkReceipt(hash), 500); | |
} | |
}; | |
_checkReceipt(txHash); | |
// Exit | |
} else { | |
reject(new Error(err.message)); | |
} | |
}); | |
}); | |
}; | |
//export default | |
module.exports = function(web3js, concurrency, throttle, callback) { | |
web3 = web3js; | |
const worker = (task, workerDone) => { | |
console.log('worker', task); | |
console.assert(task, 'Invalid task'); | |
console.assert(task.account, 'Invalid account'); | |
console.assert(task.privateKey, 'Invalid privateKey'); | |
//console.assert(task.txOpt.nonce, 'Invalid nonce'); | |
console.assert(task.txOpt.gasLimit, 'Invalid gasLimit'); | |
console.assert(task.txOpt.gasPrice, 'Invalid gasPrice'); | |
const _processTask = async () => { | |
mutexLock = true; | |
// Queue priority is separate to nonce, if left undefined we can count. | |
if (!task.txOpt.nonce) | |
task.txOpt.nonce = await web3.eth.getTransactionCount( | |
task.account, | |
'pending' | |
); | |
console.log('nonce', task.account, task.txOpt.nonce); | |
// EIP 155 `chainId` | |
const netId = await web3.eth.net.getId(); | |
task.txOpt.chainId = web3.utils.toHex(netId); | |
const signedTx = signTask(task); | |
// TODO: This is resolve/reject with response | |
const receipt = await sendSignedTask(signedTx, throttle); | |
callback(receipt); | |
workerDone(); | |
//setTimeout(workerDone, 30000); | |
}; | |
// Waits until transaction is broadcast before starting another. | |
const _waitMutex = () => { | |
if (!mutexLock) return _processTask(); | |
setTimeout(() => { | |
_waitMutex(); | |
}, 500); | |
}; | |
_waitMutex(); | |
}; | |
// Start with a priority queue | |
return asyncPriorityQueue(worker, concurrency); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment