在 promise 函数里,我们首先通过调用 .getInactiveWorkerId() 来检查是否存在空闲的 worker 可以来处理数据:
- private getInactiveWorkerId(): number {
- for (let i = 0; i < this.numberOfThreads; i += 1) {
- if (!this.activeWorkersById[i]) {
- return i;
- }
- }
- return -1;
- }
接下来,我们创建一个 queueItem,在其中保存传递给 .run() 方法的 getData 函数以及回调。在回调中,我们要么 resolve 或者 reject promise,这取决于 worker 是否将错误传递给回调。
如果 availableWorkerId 的值是 -1,意味着当前没有可用的 worker,我们将 queueItem 添加到 queue。如果有可用的 worker,则调用 .runWorker() 方法来执行 worker。
在 .runWorker() 方法中,我们必须把当前 worker 的 activeWorkersById 设置为使用状态;为 message 和 error 事件设置事件监听器(并在之后清理它们);最后将数据发送给 worker。
- private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
- const worker = this.workersById[workerId];
- this.activeWorkersById[workerId] = true;
- const messageCallback = (result: N) => {
- queueItem.callback(null, result);
- cleanUp();
- };
- const errorCallback = (error: any) => {
- queueItem.callback(error);
- cleanUp();
- };
- const cleanUp = () => {
- worker.removeAllListeners('message');
- worker.removeAllListeners('error');
- this.activeWorkersById[workerId] = false;
- if (!this.queue.length) {
- return null;
- }
- this.runWorker(workerId, this.queue.shift());
- };
- worker.once('message', messageCallback);
- worker.once('error', errorCallback);
- worker.postMessage(await queueItem.getData());
- }
首先,通过使用传递的 workerId,我们从 workersById 中获得 worker 引用。然后,在 activeWorkersById 中,将 [workerId] 属性设置为true,这样我们就能知道在 worker 在忙,不要运行其他任务。
接下来,分别创建 messageCallback 和 errorCallback 用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给 worker。
在回调中,我们调用 queueItem 的回调,然后调用 cleanUp 函数。在 cleanUp 函数中,要删除事件侦听器,,因为我们会多次重用同一个 worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。
在 activeWorkersById 状态中,我们将 [workerId] 属性设置为 false,并检查队列是否为空。如果不是,就从 queue 中删除第一个项目,并用另一个 queueItem 再次调用 worker。
接着创建一个在收到 message 事件中的数据后进行一些计算的 worker:
- import { isMainThread, parentPort } from 'worker_threads';
- if (isMainThread) {
- throw new Error('Its not a worker');
- }
- const doCalcs = (data: any) => {
- const collection = [];
- for (let i = 0; i < 1000000; i += 1) {
- collection[i] = Math.round(Math.random() * 100000);
- }
- return collection.sort((a, b) => {
- if (a > b) {
- return 1;
- }
- return -1;
- });
- };
- parentPort.on('message', (data: any) => {
- const result = doCalcs(data);
- parentPort.postMessage(result);
- });
worker 创建了一个包含 100 万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。
(编辑:ASP站长网)
|