Last active
June 12, 2021 12:45
-
-
Save jirevwe/e870ed84512377973cb7618aa298798b to your computer and use it in GitHub Desktop.
NodeJs Worker Pool with Callback
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const sleep = (ms = 1000) => new Promise((resolve) => setTimeout(resolve, ms)); | |
async function start() { | |
const execFile = join(__dirname, './worker/index.js'); | |
const pool = new ThreadPool<number, number>({ | |
execFile, | |
jobCallback, | |
timeOutCallback, | |
debug: true, | |
threadTimeout: 3 | |
}); | |
pool.run(0); | |
pool.run(1); | |
function timeOutCallback() { | |
process.exit(0); | |
} | |
async function jobCallback(payload: number) { | |
console.log(payload); | |
await sleep(2000); | |
if (payload < 1_000_000) { | |
pool.run(payload * 2); | |
pool.run(payload * 3); | |
} | |
} | |
} | |
start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function processThisShit(payload: WorkerRequest) { | |
for (const key in payload) { | |
payload[key] = payload[key] + ' — (processed)'; | |
} | |
parentPort.postMessage(payload); | |
} | |
parentPort.on('message', processThisShit); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Worker as Thread } from 'worker_threads'; | |
import os from 'os'; | |
/** | |
* Creates a thread pool | |
* | |
* @param threadCount the number of threads to spawn | |
* @param execFile the file containing the worker function | |
* @param jobCallback the result callback (executed on the main thread) | |
* @param tasks initial tasks that will be seeded on the queue | |
*/ | |
export interface ThreadPoolOptions<Task, Result> { | |
jobCallback: (result: Result) => {}; | |
timeOutCallback: () => void; | |
execFile: string; | |
threadTimeout?: number; | |
threadCount?: number; | |
tasks?: Task[]; | |
debug?: boolean; | |
} | |
export class ThreadPool<Task, Result> { | |
private threads: Thread[] = []; | |
private idle: number[] = []; | |
private queue: Task[] = []; | |
private timeout: NodeJS.Timeout; | |
constructor(private readonly options: ThreadPoolOptions<Task, Result>) { | |
const { tasks, debug, execFile, threadCount } = options; | |
if (!debug) { | |
options.debug = false; | |
} | |
options.threadTimeout = options.threadTimeout | |
? options.threadTimeout * 1000 | |
: 10 * 1000; | |
if (!threadCount) { | |
options.threadCount = os.cpus().length * 2; | |
} | |
if (tasks) { | |
this.queue.push(...tasks); | |
} | |
for (let i = 0; i < options.threadCount; i++) { | |
const worker = new Thread(execFile); | |
this.threads.push(worker); | |
this.idle.push(worker.threadId); | |
this.registerCallbacks(worker); | |
} | |
} | |
/** | |
* Adds work to the pool and tries to execute it | |
*/ | |
run(payload: Task) { | |
this.queue.push(payload); | |
this.runNext(); | |
} | |
/** | |
* Picks the first task from queue and runs it | |
*/ | |
private runNext() { | |
if (this.options.debug) { | |
console.log(`queue: ${this.queue.length}, idle: ${this.idle.length}`); | |
} | |
// the threads are idle and the queue is empty | |
if (this.idle.length === this.threads.length && this.queue.length === 0) { | |
this.timeout = setTimeout( | |
this.options.timeOutCallback, | |
this.options.threadTimeout | |
); | |
return; | |
} | |
if (this.idle.length === 0) return; | |
// we have more jobs, don't stop the threads | |
if (this.timeout) { | |
clearTimeout(this.timeout); | |
this.timeout = null; | |
} | |
// get the first idle worker | |
const worker = this.threads.find((it) => it.threadId === this.idle[0]); | |
// dequeue the first item | |
const payload = this.queue.shift(); | |
if (!payload) return; | |
this.formatDebugMessage(payload, 'task'); | |
// give the worker work to do | |
worker.postMessage(payload); | |
// remove the idle worker from the idle pool | |
this.idle.shift(); | |
} | |
private formatDebugMessage(payload: Task | Result, type: 'task' | 'result') { | |
if (!this.options.debug) return; | |
const data = | |
typeof payload === 'object' ? JSON.stringify(payload) : payload; | |
console.log( | |
`${type} ${type === 'task' ? 'received' : 'sent'}, data: (${data})` | |
); | |
} | |
private registerCallbacks(worker: Thread) { | |
worker.on('message', (result: Result) => { | |
this.formatDebugMessage(result, 'result'); | |
this.options.jobCallback(result); | |
this.idle.push(worker.threadId); | |
this.runNext(); | |
}); | |
worker.on('error', (error) => { | |
console.log( | |
`thread with thread id (${worker.threadId}) received an error event: ${error.message}` | |
); | |
this.idle.push(worker.threadId); | |
this.runNext(); | |
}); | |
worker.on('messageerror', (error) => { | |
console.log( | |
`thread with thread id (${worker.threadId}) received a messageerror event: (${error.message})` | |
); | |
}); | |
worker.on('online', () => { | |
if (this.options.debug) { | |
console.log(`thread with id (${worker.threadId}) online`); | |
} | |
}); | |
worker.on('exit', (code) => { | |
console.log( | |
`thread with thread id (${worker.threadId}) exited with error code (${code})` | |
); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment