在上一篇中我们介绍了 mpi4py 中的规约操作方法,下面我们将介绍全收集操作。
对组内通信子上的全收集操作,将组内所有进程的发送缓冲区的数据连接在一起,并发送到所有进程的接收缓冲区内。
对组间通信子上的全收集操作,假定关联的组为 group A 和 group B,则 A 中的每一个进程贡献一个数据项,连接在一起保存到 B 的各个进程中,同时 B 的各个进程也将自己的数据项连接在一起发送给 A 的各个进程。因此 A 的发送缓冲区参数必须与 B 的接收缓冲区参数匹配,反之亦然。
组间通信子上的全收集操作可以是非对称的, group A 中的各进程所发送的数据量可能与 group B 中各进程发送的数据量不同,甚至在某个方向上某些进程的发送数据量可以为 0。
方法接口
mpi4py 中的全收集操作的方法(MPI.Comm 类的方法)接口为:
allgather(self, sendobj)
Allgather(self, sendbuf, recvbuf)
Allgatherv(self, sendbuf, recvbuf)
这些方法的参数与收集操作对应方法的参数类似,不同的是对全收集操作没有了 root
参数。
对组内通信子对象的 Allgather 和 Allgatherv,可以将其 sendbuf
参数设置成 MPI.IN_PLACE,此时 recvbuf
将被既用做发送缓冲区,又用作接收缓冲区,并且默认各进程需要从自己处接收到 recvbuf
的数据已经分别在正确的位置上。
例程
下面给出全收集操作的使用例程。
# allgather.py
"""
Demonstrates the usage of allgather, Allgather, Allgatherv.
Run this with 4 processes like:
$ mpiexec -n 4 python allgather.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# gather generic object from each process by using allgather
if rank == 0:
send_obj = 1.2
elif rank == 1:
send_obj = 'xxx'
elif rank == 2:
send_obj = {'a': 1}
else:
send_obj = (2,)
recv_obj = comm.allgather(send_obj)
print 'allgather: rank %d has %s' % (rank, recv_obj)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process by using Allgather
send_buf = np.array([0, 1], dtype='i') + 2 * rank
recv_buf = np.empty(8, dtype='i')
comm.Allgather(send_buf, recv_buf)
print 'Allgather: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process by using Allgather with MPI.IN_PLACE
# initialize a receive buffer for each process
recv_buf = np.zeros(8, dtype='i') - 1
if rank == 0:
recv_buf[:2] = np.array([0, 1]) # [0, 1, -1, -1, -1, -1, -1, -1]
elif rank == 1:
recv_buf[2:4] = np.array([2, 3]) # [-1, -1, 2, 3, -1, -1, -1, -1]
elif rank == 2:
recv_buf[4:6] = np.array([4, 5]) # [-1, -1, -1, -1, 4, 5, -1, -1]
elif rank == 3:
recv_buf[6:] = np.array([6, 7]) # [ -1, -1, -1, -1, -1, -1, 6, 7]
# with MPI.IN_PLACE, recv_buf is used as both a send and a receive buffer
comm.Allgather(MPI.IN_PLACE, recv_buf)
print 'Allgather: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather numpy array with different length from each process by using Gatherv
if rank == 0:
send_buf = np.array([10, 11, 12], dtype='i')
elif rank == 1:
send_buf = np.array([13], dtype='i')
elif rank == 2:
send_buf = np.array([14, 15, 16, 17], dtype='i')
else:
send_buf = np.array([18, 19], dtype='i')
recv_buf = np.empty(10, dtype='i')
count = [3, 1, 4, 2]
displ = [0, 3, 4, 8]
# gather numpy arrays with different length to the root from each process with allocation:
# rank 0 | rank 1 | rank 2 | rank 3
# -----------+------------+-------------+-------------
# 10 11 12 | 13 | 14 15 16 17 | 18 19
# displ: 0 3 4 8
comm.Allgatherv(send_buf, [recv_buf, count, displ, MPI.INT])
print 'Allgatherv: rank %d has %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 4 python allgather.py
allgather: rank 1 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 2 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 3 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 0 has [1.2, 'xxx', {'a': 1}, (2,)]
Allgather: rank 3 has [0 1 2 3 4 5 6 7]
Allgather: rank 3 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 3 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 1 has [0 1 2 3 4 5 6 7]
Allgather: rank 1 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 1 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 2 has [0 1 2 3 4 5 6 7]
Allgather: rank 2 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 2 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 0 has [0 1 2 3 4 5 6 7]
Allgather: rank 0 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 0 has [10 11 12 13 14 15 16 17 18 19]
以上我们介绍了 mpi4py 中的全收集操作方法,在下一篇中我们将介绍全规约操作。
网友评论