Last active
August 5, 2020 00:01
-
-
Save vladgovor77771/df181ee36f4613ba98fb03ba6497cff6 to your computer and use it in GitHub Desktop.
Chained queue implemetation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const BullQueue = require('bull'); | |
const EventEmitter = require('events'); | |
const createUid = require('uid'); | |
class ChainedQueue extends BullQueue { | |
constructor(...args) { | |
super(...args); | |
this.ee = new EventEmitter(); | |
this.on('global:completed', (jobId, result) => this.ee.emit(`job:${jobId}:completed`, result)); | |
this.on('global:failed', (jobId, err) => this.ee.emit(`job:${jobId}:failed`, err)); | |
} | |
async add(_job, ...next) { | |
let { | |
name = '__default__', | |
data = {}, | |
options = {}, | |
appendNextJobData = false, | |
trace = [], | |
chainId = createUid(10), | |
} = _job; | |
let latest = next.length == 0; | |
let job = await super.add(name, data, options); | |
this.ee.once(`job:${job.id}:completed`, result => { | |
if (latest) { | |
trace.push({ name, data, result: JSON.parse(result) }); | |
this.ee.emit(`job_chain:${chainId}:completed`, { result: JSON.parse(result), trace }); | |
} | |
}); | |
this.ee.once(`job:${job.id}:failed`, err => { | |
trace.push({ name, data, err }); | |
this.ee.emit(`job_chain:${chainId}:failed`, { err, trace }) | |
}); | |
job.await = () => new Promise((resolve, reject) => { | |
this.ee.once(`job:${job.id}:completed`, result => resolve(JSON.parse(result))); | |
this.ee.once(`job:${job.id}:failed`, err => reject(new Error(err))); | |
}); | |
job.awaitAll = () => new Promise((resolve, reject) => { | |
this.ee.once(`job_chain:${chainId}:completed`, result => resolve(result)); | |
this.ee.once(`job_chain:${chainId}:failed`, err => reject(err)); | |
}); | |
if (latest) return job; | |
let nextJob = next.shift(); | |
job | |
.await() | |
.then(async result => { | |
trace.push({ name, data, result }); | |
if (appendNextJobData) nextJob.data = Object.assign(result, nextJob.data); | |
await this.add({ ...nextJob, trace, chainId }, ...next); | |
}); | |
return job; | |
} | |
} | |
module.exports = ChainedQueue; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { redis } = require('../config'); | |
const sleep = require('sleep-promise'); | |
const ChainedQueue = require('./chained_queue'); | |
let queue = new ChainedQueue('test_queue', { | |
redis: { | |
port: redis.port, | |
host: redis.host, | |
password: redis.password, | |
} | |
}); | |
queue.process('A', 1, async (job) => { | |
console.log('A', job.data); | |
let { counter } = job.data; | |
await sleep(5000); | |
return { counter: ++counter }; | |
}); | |
queue.process('B', 1, async (job) => { | |
console.log('B', job.data); | |
let { counter } = job.data; | |
await sleep(5000); | |
return { counter: ++counter }; | |
}); | |
queue.process('C', 1, async (job) => { | |
console.log('C', job.data); | |
let { counter } = job.data; | |
await sleep(5000); | |
return { counter: ++counter }; | |
}); | |
const run = async () => { | |
let counter = 1; | |
let incrementJob = await queue.add({ | |
name: 'A', | |
data: { counter }, | |
appendNextJobData: true | |
}, { | |
name: 'B', | |
appendNextJobData: true | |
}, { | |
name: 'C', | |
}); | |
let [ firstJobResult, chainResult ] = await Promise.all([ | |
incrementJob.await(), | |
incrementJob.awaitAll() | |
]); | |
console.log(firstJobResult) | |
/* | |
{ counter: 2 } | |
*/ | |
console.log(chainResult); | |
/* | |
{ | |
result: { counter: 4 }, | |
trace: [ | |
{ name: 'A', data: { counter: 1 }, result: { counter: 2 } }, | |
{ name: 'B', data: { counter: 2 }, result: { counter: 3 } }, | |
{ name: 'C', data: { counter: 3 }, result: { counter: 4 } }, | |
] | |
} | |
*/ | |
} | |
run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment