Last active
February 27, 2022 19:05
-
-
Save ardeshireshghi/94c79513186d51b3294cb0daba6b8821 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { | |
WorkerPool | |
} = require('./WorkerPool'); | |
const pool = new WorkerPool({ | |
size: 24, | |
taskPath: __dirname + '/test-worker.js' | |
}); | |
const promises = [...Array(10000).keys()].map(() => { | |
return pool.run({ data: Math.round(Math.random() * 10000) + 1000000 }); | |
}); | |
(async () => { | |
const start = Date.now(); | |
await Promise.all(promises); | |
console.log( | |
'It took %s seconds with %s workers', | |
(Date.now() - start) / 1000, | |
pool.size | |
); | |
await pool.terminate(); | |
process.exit(); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { isMainThread, parentPort } = require('worker_threads'); | |
function calc(max) { | |
let j = 0; | |
for (var i = 0; i < max; i++) { | |
j += i; | |
} | |
return j; | |
} | |
if (!isMainThread) { | |
parentPort.on('message', (message) => { | |
parentPort.postMessage(calc(message.data)); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import EventEmitter from 'events'; | |
import { Worker } from 'worker_threads'; | |
class CustomWorker extends Worker { | |
public isReady: boolean; | |
constructor(...args: ConstructorParameters<typeof Worker>) { | |
super(...args); | |
this.isReady = true; | |
} | |
run(workerData) { | |
this.isReady = false; | |
return new Promise((resolve, reject) => { | |
const onMessage = (message) => { | |
resolve(message); | |
this.removeListener('error', onError); | |
this.isReady = true; | |
this.emit('ready', this); | |
}; | |
const onError = (err) => { | |
reject(err); | |
this.removeListener('message', onMessage); | |
}; | |
this.once('error', onError); | |
this.once('message', onMessage); | |
this.postMessage(workerData); | |
}); | |
} | |
} | |
export class WorkerPool extends EventEmitter { | |
public size: number; | |
public taskPath: string; | |
public workers: CustomWorker[]; | |
public taskQueue: any[]; | |
constructor({ size, taskPath }: { size: number; taskPath: string }) { | |
super(); | |
this.size = size; | |
this.taskPath = taskPath; | |
this.taskQueue = []; | |
this.workers = this._createWorkers(); | |
} | |
public async run(workerData: any) { | |
return new Promise((resolve, reject) => { | |
this.taskQueue.push({ workerData, resolve, reject }); | |
this.executeTask(); | |
}); | |
} | |
public async terminate() { | |
this.workers = []; | |
this.removeAllListeners(); | |
await Promise.all(this.workers.map((worker) => worker.terminate())); | |
} | |
private _createWorkers() { | |
return Array.from(Array(this.size).keys()).map(() => { | |
return this._createWorker(); | |
}); | |
} | |
private _createWorker() { | |
const worker = new CustomWorker(this.taskPath); | |
worker.on('ready', (theWorker) => { | |
this.runTask(theWorker); | |
}); | |
worker.once('exit', (code) => { | |
if (code !== 0) { | |
worker.terminate(); | |
this.workers[this.workers.indexOf(worker)] = new CustomWorker( | |
this.taskPath | |
); | |
} | |
}); | |
return worker; | |
} | |
private async executeTask() { | |
const freeWorker = this.workers.find((worker) => worker.isReady); | |
if (freeWorker) { | |
this.runTask(freeWorker); | |
} | |
} | |
private runTask(worker) { | |
if (this.taskQueue.length > 0) { | |
const { workerData, resolve, reject } = this.taskQueue.shift(); | |
if (workerData) { | |
return worker.run(workerData).then(resolve).catch(reject); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment