Skip to content

Instantly share code, notes, and snippets.

@kraftwerk28
Last active September 10, 2024 13:39
Show Gist options
  • Save kraftwerk28/83d0cf9933037bb40d551b70e841aae1 to your computer and use it in GitHub Desktop.
Save kraftwerk28/83d0cf9933037bb40d551b70e841aae1 to your computer and use it in GitHub Desktop.
import { isMainThread, Worker, workerData } from "node:worker_threads";
import { connectAsync } from "mqtt";
import { once } from "node:events";
const WORKER_COUNT = 100;
const MSG_PER_WORKER = 500;
const LISTENER_QOS = 1;
const FLOODER_QOS = 1;
async function runFloodWorker() {
const { id } = workerData;
const mqtt = await connectAsync();
mqtt.on("offline", () => console.error(`Client ${id} is offline!`));
mqtt.on("error", e => console.error(e));
await Promise.allSettled(
Array.from({ length: MSG_PER_WORKER }).map((_, index) =>
mqtt.publishAsync(
`machine/${id}/test`,
JSON.stringify({ value: index }),
{ qos: FLOODER_QOS },
),
),
);
await mqtt.endAsync();
}
async function runFloodListener() {
const fileUrl = new URL(import.meta.url);
const mqtt = await connectAsync({
connectTimeout: 1000,
});
console.info("Connected to broker");
const stats = {};
mqtt.on("offline", () => console.error(`Client master is offline!`));
mqtt.on("error", e => console.error(e));
mqtt.on("message", (topic, payload, pkt) => {
const [, machineId] = topic.match(/machine\/([^\/]+)/);
stats[machineId] ??= new Set();
const { value } = JSON.parse(payload.toString());
stats[machineId].add(value);
});
await mqtt.subscribeAsync(`machine/+/test`, { qos: LISTENER_QOS });
console.info("Subscribed to topic");
const workers = Array.from({ length: WORKER_COUNT }).map((_, index) => {
const workerData = { id: index };
return new Worker(fileUrl, { workerData });
});
console.info(`Spawned ${workers.length} workers`);
await Promise.allSettled(workers.map(w => once(w, "exit")));
console.info("All workers are done");
console.table(
Object.entries(stats).map(([workerId, msgIds]) => ({
worker: workerId,
count: msgIds.size,
})),
);
await mqtt.endAsync();
}
if (isMainThread) runFloodListener();
else runFloodWorker();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment