Created
March 6, 2022 20:41
-
-
Save btd/743cc268ccaee0b7a8a68c25fe393d8a to your computer and use it in GitHub Desktop.
RabbitMq node.js token bucket example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import amqp from "amqplib"; | |
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); | |
const connection = await amqp.connect("amqp://localhost"); | |
const channel = await connection.createChannel(); | |
var queue = "task_queue"; | |
var msg = "Hello World!"; | |
await channel.assertQueue(queue, { | |
durable: true, | |
}); | |
while (true) { | |
channel.sendToQueue(queue, Buffer.from(msg), { | |
persistent: true, | |
}); | |
console.log(" [x] Sent '%s'", msg); | |
await delay(100); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import amqp from "amqplib"; | |
const connection = await amqp.connect("amqp://localhost"); | |
const channel = await connection.createChannel(); | |
var limit = 3; | |
var bucketQueue = `tokens${limit}`; | |
await channel.assertQueue(bucketQueue, { | |
durable: true, | |
messageTtl: 1000, | |
maxLength: limit, | |
}); | |
function writeToken() { | |
channel.sendToQueue(bucketQueue, Buffer.from(new Date().toISOString()), { | |
persistent: true, | |
}); | |
setTimeout(writeToken, 1000 / limit); | |
} | |
console.log("Starting token bucket publisher"); | |
writeToken(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import amqp from "amqplib"; | |
const connection = await amqp.connect("amqp://localhost"); | |
const channel = await connection.createChannel(); | |
var queue = "task_queue"; | |
var bucketQueue = "tokens3"; | |
await channel.assertQueue(queue, { durable: true }); | |
await channel.prefetch(1); | |
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); | |
let c = 0; | |
const consumer = await channel.consume( | |
queue, | |
async (msg) => { | |
const token = await channel.get(bucketQueue); | |
if (token) { | |
console.log(" [x] Received %s", msg.content.toString(), new Date()); | |
channel.ack(msg); | |
} else { | |
//console.log(" [x] Bucket is full, rejecting message"); | |
channel.nack(msg); | |
} | |
}, | |
{ noAck: false } | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment