Skip to content

Instantly share code, notes, and snippets.

@vladgovor77771
Last active August 5, 2020 00:01
Show Gist options
  • Save vladgovor77771/df181ee36f4613ba98fb03ba6497cff6 to your computer and use it in GitHub Desktop.
Save vladgovor77771/df181ee36f4613ba98fb03ba6497cff6 to your computer and use it in GitHub Desktop.
Chained queue implemetation
const BullQueue = require('bull');
const EventEmitter = require('events');
const createUid = require('uid');
class ChainedQueue extends BullQueue {
constructor(...args) {
super(...args);
this.ee = new EventEmitter();
this.on('global:completed', (jobId, result) => this.ee.emit(`job:${jobId}:completed`, result));
this.on('global:failed', (jobId, err) => this.ee.emit(`job:${jobId}:failed`, err));
}
async add(_job, ...next) {
let {
name = '__default__',
data = {},
options = {},
appendNextJobData = false,
trace = [],
chainId = createUid(10),
} = _job;
let latest = next.length == 0;
let job = await super.add(name, data, options);
this.ee.once(`job:${job.id}:completed`, result => {
if (latest) {
trace.push({ name, data, result: JSON.parse(result) });
this.ee.emit(`job_chain:${chainId}:completed`, { result: JSON.parse(result), trace });
}
});
this.ee.once(`job:${job.id}:failed`, err => {
trace.push({ name, data, err });
this.ee.emit(`job_chain:${chainId}:failed`, { err, trace })
});
job.await = () => new Promise((resolve, reject) => {
this.ee.once(`job:${job.id}:completed`, result => resolve(JSON.parse(result)));
this.ee.once(`job:${job.id}:failed`, err => reject(new Error(err)));
});
job.awaitAll = () => new Promise((resolve, reject) => {
this.ee.once(`job_chain:${chainId}:completed`, result => resolve(result));
this.ee.once(`job_chain:${chainId}:failed`, err => reject(err));
});
if (latest) return job;
let nextJob = next.shift();
job
.await()
.then(async result => {
trace.push({ name, data, result });
if (appendNextJobData) nextJob.data = Object.assign(result, nextJob.data);
await this.add({ ...nextJob, trace, chainId }, ...next);
});
return job;
}
}
module.exports = ChainedQueue;
const { redis } = require('../config');
const sleep = require('sleep-promise');
const ChainedQueue = require('./chained_queue');
let queue = new ChainedQueue('test_queue', {
redis: {
port: redis.port,
host: redis.host,
password: redis.password,
}
});
queue.process('A', 1, async (job) => {
console.log('A', job.data);
let { counter } = job.data;
await sleep(5000);
return { counter: ++counter };
});
queue.process('B', 1, async (job) => {
console.log('B', job.data);
let { counter } = job.data;
await sleep(5000);
return { counter: ++counter };
});
queue.process('C', 1, async (job) => {
console.log('C', job.data);
let { counter } = job.data;
await sleep(5000);
return { counter: ++counter };
});
const run = async () => {
let counter = 1;
let incrementJob = await queue.add({
name: 'A',
data: { counter },
appendNextJobData: true
}, {
name: 'B',
appendNextJobData: true
}, {
name: 'C',
});
let [ firstJobResult, chainResult ] = await Promise.all([
incrementJob.await(),
incrementJob.awaitAll()
]);
console.log(firstJobResult)
/*
{ counter: 2 }
*/
console.log(chainResult);
/*
{
result: { counter: 4 },
trace: [
{ name: 'A', data: { counter: 1 }, result: { counter: 2 } },
{ name: 'B', data: { counter: 2 }, result: { counter: 3 } },
{ name: 'C', data: { counter: 3 }, result: { counter: 4 } },
]
}
*/
}
run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment