为了更好地描述我们将要做的事情,下面我们来创建一个由八个 thread worker 组成的工作池:
- const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
如果你熟悉限制并发操作,那么你在这里看到的逻辑几乎相同,只是一个不同的用例。
如上面的代码片段所示,我们把指向 worker 的路径和要生成的 worker 数量传给了 WorkerPool 的构造函数。
- export class WorkerPool<T, N> {
- private queue: QueueItem<T, N>[] = [];
- private workersById: { [key: number]: Worker } = {};
- private activeWorkersById: { [key: number]: boolean } = {};
- public constructor(public workerPath: string, public numberOfThreads: number) {
- this.init();
- }
- }
这里还有其他一些属性,如 workersById 和 activeWorkersById,我们可以分别保存现有的 worker 和当前正在运行的 worker 的 ID。还有 queue,我们可以使用以下结构来保存对象:
- type QueueCallback<N> = (err: any, result?: N) => void;
- interface QueueItem<T, N> {
- callback: QueueCallback<N>;
- getData: () => T;
- }
callback 只是默认的节点回调,第一个参数是错误,第二个参数是可能的结果。 getData 是传递给工作池 .run() 方法的函数(如下所述),一旦项目开始处理就会被调用。 getData 函数返回的数据将传给工作线程。
在 .init() 方法中,我们创建了 worker 并将它们保存在以下状态中:
- private init() {
- if (this.numberOfThreads < 1) {
- return null;
- }
- for (let i = 0; i < this.numberOfThreads; i += 1) {
- const worker = new Worker(this.workerPath);
- this.workersById[i] = worker;
- this.activeWorkersById[i] = false;
- }
- }
为避免无限循环,我们首先要确保线程数 > 1。然后创建有效的 worker 数,并将它们的索引保存在 workersById 状态。我们在 activeWorkersById 状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。
现在我们必须实现前面提到的 .run() 方法来设置一个 worker 可用的任务。
- public run(getData: () => T) {
- return new Promise<N>((resolve, reject) => {
- const availableWorkerId = this.getInactiveWorkerId();
- const queueItem: QueueItem<T, N> = {
- getData,
- callback: (error, result) => {
- if (error) {
- return reject(error);
- }
- return resolve(result);
- },
- };
- if (availableWorkerId === -1) {
- this.queue.push(queueItem);
- return null;
- }
- this.runWorker(availableWorkerId, queueItem);
- });
- }
(编辑:ASP站长网)
|