用 mpiexec 执行上面的代码,能够得到一个分布式进程调度器,基于任何标准 MPI 实现都可以,结果如下:
- cluster@miriad2a:~/nfs$ mpiexec -n 2 -ppn 1 -hosts miriad2a,miriad2b python ptdist.py
- Rank-0 has sent the following tensor to Rank-1
- tensor([ 1., -1.])
- Rank-1 has recieved the following tensor from Rank-0
- tensor([ 1., -1.])
- 第一行要被执行的是 dist.init_process_group(backend),它基本上设置了参与节点之间的内部通信通道。它使用了一个参数来指定使用哪个后端(backend)。因为我们完全使用 MPI,所以在我们的例子中 backend='mpi'。也有其他的后端(例如 TCP、Gloo、NCCL)。
- 需要检索的两个参数——world size 和 rank。World 指的是在特定 mpiexec 调用环境中所有节点的集合(参见 mpiexec 中的 -hosts flag)。rank 是由 MPI 运行时为每一个进程分配的唯一整数。它从 0 开始。它们在 -hosts 中被指定的顺序用于分配数值。所以,在这个例子中,节点「miriad2a」上的进程会被赋值 Rank 0,节点「miriad2b」上的进程会被赋值为 Rank 1.
- x 是 Rank 0 打算发送到 Rank 1 的张量,通过 dist.send(x, dst=1) 完成。
- z 是 Rank 1 在接收到张量之前就创建的东西。我们需要一个早就创建好的同维度的张量作为接收传送来的张量的占位符。z 的值最终会被 x 替代。
- 与 dist.send(..) 类似,负责接收的对应函数是 dist.recv(z, src=0),它将张量接收到 z。
通信集体
我们在上一部分看到的是一个「点对点」通信的例子,在给定的环境中,rank(s) 将数据发送到特定的 rank(s)。尽管这种通信是有用的,因为它对通信提供了细粒度的控制,但是还有其他被经常使用的标准通信模式,叫作集体(collectives)。下面介绍了 Synchronous SGD 算法中我们感兴趣的一个集体——all-reduce 集体。
1. ALL-REDUCE 集体
All-reduce 是一种同步通信方式,所有的 ranks 都被执行了一个 reduction 运算,并且得到的结果对所有的 ranks 都是可见的。下图介绍了这个思想(将求和作为 reduction 运算)。
all-reduce 集体
- def main(rank, world):
- if rank == 0:
- x = torch.tensor([1.])
- elif rank == 1:
- x = torch.tensor([2.])
- elif rank == 2:
- x = torch.tensor([-3.])
-
- dist.all_reduce(x, op=dist.reduce_op.SUM)
- print('Rank {} has {}'.format(rank, x))
-
- if __name__ == '__main__':
- dist.init_process_group(backend='mpi')
- main(dist.get_rank(), dist.get_world_size())
PyTorch 中 all-reduce 集体的基本用法
在 world of 3 环境中启动时,结果如下:
- cluster@miriad2a:~/nfs$ mpiexec -n 3 -ppn 1 -hosts miriad2a,miriad2b,miriad2c python ptdist.py
- Rank 1 has tensor([0.])
- Rank 0 has tensor([0.])
- Rank 2 has tensor([0.])
- if rank == … elif 是我们在分布式计算中多次遇到的模式。在这个例子中,它被用来在不同的 rank 上创建张量。
- 它们一起执行了 all-reduce(可以看见,dist.all_reduce(..) 在 if … elif block 逻辑块的外部),求和 (dist.reduce_op.SUM) 作为 reduction 运算。
- 将来自每个 rank 的 x 求和,再把得到的求和结果放置在每个 rank 的 x 内。
转向深度学习
(编辑:ASP站长网)
|