Last active
January 9, 2022 10:13
-
-
Save Eunovo/c062954fd17ecee45a36e5c7ae0cd925 to your computer and use it in GitHub Desktop.
Code describing a system to handle CPU-intensive jobs on NodeJS servers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JobProcessor { | |
private assignedJobs = new Map<String, any>(); | |
private queue = new Queue<any>(); | |
private nWorkers = 5; | |
async loadOutstandingJobs() { | |
// load 'pending' or 'processing' jobs from DB | |
const jobs = await services.Job | |
.findMany({ | |
$or: [ | |
{ status: 'pending' }, { status: 'processing' } | |
] | |
}); | |
jobs.forEach(job => this.queue.enqueue(job)); | |
} | |
async registerJob(job: any) { | |
// save job to DB before it is added to queue | |
const _id = await services.Job.create({ ...job, status: 'pending' }); | |
this.queue.enqueue({ ...job, _id }); | |
} | |
async processJobs() { | |
const workers = new WorkerPool(this.nWorkers); | |
workers.init(); | |
workers.on('message', async ({ id, message, status, data }) => { | |
if (message === WorkerMessage.job_complete) { | |
const job = this.assignedJobs.get(id); | |
this.assignedJobs.set(id, null); | |
// update job status | |
services.Job | |
.updateOne({ status, data }, { _id: job._id }); | |
} | |
const newJob: any = await this.queue.dequeue(); | |
workers.send(id, newJob); | |
this.assignedJobs.set(id, newJob); | |
// update job status | |
services.Job | |
.updateOne({ status: 'processing' }, { _id: newJob._id }); | |
}); | |
workers.on('exit', (id) => { | |
const ongoingJob = this.assignedJobs.get(id); | |
if (!ongoingJob) return; | |
// Re-queue the job that wasn't finished | |
this.queue.enqueue(ongoingJob); | |
}); | |
} | |
} | |
export const jobProcessor = new JobProcessor(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class Observable<T> { | |
private subscribers: Array<(data: T) => void> = []; | |
push(data: T) { | |
this.subscribers | |
.forEach((listener) => listener(data)); | |
} | |
subscribe(listener: (data: T) => void) { | |
this.subscribers = [...this.subscribers, listener]; | |
return () => { | |
this.subscribers = this.subscribers | |
.filter((value) => value !== listener); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from "./Observable"; | |
enum QueueEvents { | |
enqueue = 'enqueue', | |
dequeue = 'dequeue' | |
} | |
export class Queue<T> { | |
private observable: Observable<QueueEvents> = new Observable(); | |
private items: T[] = []; | |
enqueue(item: T) { | |
this.items.push(item); | |
this.observable.push(QueueEvents.enqueue); | |
} | |
async dequeue() { | |
if (this.items.length > 0) { | |
const currentItem = this.items[0]; | |
this.items = this.items.filter((_, index) => index !== 0); | |
this.observable.push(QueueEvents.dequeue); | |
return currentItem; | |
} | |
return new Promise((resolve) => { | |
const unsubscribe = this.observable.subscribe(async (message) => { | |
if (message !== QueueEvents.enqueue) return; | |
resolve(await this.dequeue()); | |
unsubscribe(); | |
}); | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { workerData, parentPort } from "worker_threads"; | |
import { WorkerMessage } from "./WorkerMessage"; | |
parentPort.on('message', async (job) => { | |
const { data } = job; | |
try { | |
// process job here | |
parentPort.postMessage({ | |
message: WorkerMessage.job_complete, | |
status: 'completed', | |
data: { ...data, resultId } | |
}); | |
} catch (error) { | |
parentPort.postMessage({ | |
message: WorkerMessage.job_complete, | |
status: 'failed', | |
data: { ...data, error: error.message } | |
}); | |
} | |
}); | |
parentPort.postMessage({ message: WorkerMessage.request_job }); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export enum WorkerMessage { | |
request_job = 'request_job', | |
job_complete = 'job_complete' | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { v4 } from "uuid"; | |
import { Worker } from "worker_threads"; | |
import { Observable } from "./Observable"; | |
export class WorkerPool { | |
private observable = new Observable<{ | |
event: "message" | "exit", | |
data?: any | |
}>(); | |
private workers: Map<string, Worker> = new Map(); | |
constructor( | |
private nWorkers: number | |
) { } | |
init() { | |
for (let i = 0; i < this.nWorkers; i++) { | |
this.createWorker(); | |
} | |
} | |
private createWorker() { | |
const worker = new Worker(`${__dirname}/worker.js`); | |
const id = v4(); | |
this.workers.set(id, worker); | |
worker.on("message", (value) => { | |
this.observable.push({ event: "message", data: { id, ...value } }); | |
}); | |
worker.on("exit", () => { | |
this.observable.push({ event: "exit" }); | |
this.workers.delete(id); | |
// Create another worker to replace the closing worker | |
this.createWorker(); | |
}) | |
} | |
send(id: string, data: any) { | |
const worker = this.workers.get(id); | |
worker?.postMessage(data); | |
} | |
on(evt: string, handler: Function) { | |
this.observable.subscribe((value) => { | |
const { event, data } = value; | |
if (evt === event) { | |
handler(data); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment