Skip to content

Instantly share code, notes, and snippets.

@gate3
Last active September 13, 2019 16:01
Show Gist options
  • Save gate3/7d2fb107126cf16ae5a6e58c258d19f4 to your computer and use it in GitHub Desktop.
Save gate3/7d2fb107126cf16ae5a6e58c258d19f4 to your computer and use it in GitHub Desktop.
require('dotenv').config()
const redis = require('redis')
const QueueHelper = require('../../util/rabbitmq.helper');
const modelContainer = require('../models')
const pub = redis.createClient(); // creating for localhost
class BaseWorker {
constructor (type, model) {
this.type = type
this.QueueName = `${type}:queue`
this.rb = new QueueHelper(this.QueueName); // Named the queue after the type
this.model = model
}
async runQueue () {
let count = 0;
await this.rb.setup();
const {messageCount} = await this.rb.queueAssert
const type = this.type
this.rb.channel.consume(
this.QueueName,
async msg => {
try{
setImmediate( async function () {
const strContent = msg.content.toString()
const queuedData = JSON.parse(strContent);
const {item_id} = queuedData
await this.model.create(queuedData)
this.rb.channel.ack(msg);
++count
await pub.publish(`channel:${type}`, JSON.stringify({count, item_id, messageCount}))
if(count == messageCount) {
// At this point the queue has finished and you can perform any clean up etc here
}
})
}catch(e){
}
}
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment