美文网首页Python 并行计算
mpi4py 中的非阻塞 I/O 操作

mpi4py 中的非阻塞 I/O 操作

作者: 自可乐 | 来源:发表于2018-08-05 16:46 被阅读83次

    上一篇中我们介绍了 mpi4py 中读/写文件中数组的方法,下面我们将介绍 mpi4py 中的非阻塞 I/O 操作。

    MPI 支持前面介绍过的所有读/写方法(包括使用独立文件指针和显式偏移地址的读/写方法,非集合和集合读/写方法)的非阻塞版本。非阻塞读/写方法的名称一般是在对应的阻塞读/写方法的名称前面加上 I,如 Iread/Iwrite,Iread_at/Iwrite_at 等。非阻塞读/写方法和非阻塞通信方法一样,调用完后会立即返回一个 MPI.Request 对象,可以使用其 Test,Wait 等方法来检测或等待其完成。使用非阻塞 I/O 操作的一大优势在于可以潜在地重叠或部分重叠 I/O 操作和同时发生的计算和通信。

    非集合操作

    下面是非集合的非阻塞读/写方法的使用接口,它们和对应的阻塞版本有几乎完全一样的参数(少了 status 参数,实际上是将 status 参数转移到了其返回的 MPI.Request 对象的相关方法上)。调用这些非阻塞方法会立即返回一个 MPI.Request 对象,其返回并不意味着本次 I/O 操作已经完成,必须使用其返回的 MPI.Request 对象的 Test,Wait 及其变种等方法来检测和等待其完成。

    非阻塞独立文件指针读/写

    MPI.File.Iread(self, buf)
    
    MPI.File.Iwrite(self, buf)
    

    非阻塞显式偏移地址

    MPI.File.Iread_at(self, Offset offset, buf)
    
    MPI.File.Iwrite_at(self, Offset offset, buf)
    

    分步集合操作

    对集合 I/O 操作,MPI 3.1 之前的版本只支持一种称作分步集合 I/O (split collective I/O) 的非阻塞集合 I/O 方法。这是一种带有一定限制性的非阻塞 I/O 操作。之所以称作分步集合 I/O,是因为其把集合访问文件拆分成了两个动作——启动集合访问和结束集合访问。要使用这种 I/O 称作,用户必须先调用一个 “begin” 方法,如 Read_all_begin,来启动集合 I/O 操作,然后在合适的地方调用一个匹配的 “恩典” 方法,如 Read_all_end,来完成该 I/O 操作。这种集合操作的限制在于,进程在某一时刻对某个文件句柄只允许有一个分步集合 I/O 操作处于活动状态。即使使用多线程,也不允许一个进程内同时有两个线程对同一个文件句柄并发地执行两个或更多个分步集合操作。也就是说当在某个文件句柄上调用了一个分步集合 I/O 的 “begin” 方法后,在没有调用其匹配的 “end” 方法结束该分步集合 I/O 操作之前,不能再在这个文件句柄上调用第二个分步集合 I/O 的 “begin” 方法。由于这一限制,一个分步集合 I/O 操作的 “begin” 方法不返回 MPI.Request 对象,也不返回其它结果(实际上其返回指为 None)。其紧接着的在同一个文件句柄上调用的 “end” 方法会匹配该分步集合 I/O 的 “begin”方法。MPI 标准允许 MPI 实现在 “begin” 调用时完成整个 I/O 操作,或是在 “end” 调用时完成整个 I/O 操作,也或者是在 “begin” 和 “end” 之间完成。另外,因为这些方法都是集合操作,因此必须由打开文件的进程组中的所有进程一起调用。分步集合 I/O 操作必须在 “begin” 和 “end” 方法中都指定相同的数据缓冲区参数(这样做的目的是为了防止某些编译器会优化运行时寄存器顺序而导致执行错误)。在分步集合 I/O 操作执行期间,即在 “begin” 和 “end” 方法之间,不能再对相同文件句柄并发地执行任何其他集合操作。如果使用多线程,必须在相同线程内调用一对匹配的 “begin” 和 “end” 操作。

    下面是分步集合读/写方法的使用接口:

    非阻塞独立文件指针读/写

    MPI.File.Read_all_begin(self, buf)
    
    MPI.File.Read_all_end(self, buf, Status status=None)
    
    MPI.File.Write_all_begin(self, buf)
    
    MPI.File.Write_all_end(self, buf, Status status=None)
    

    非阻塞显式偏移地址

    MPI.File.Read_at_all_begin(self, Offset offset, buf)
    
    MPI.File.Read_at_all_end(self, buf, Status status=None)
    
    MPI.File.Write_at_all_begin(self, Offset offset, buf)
    
    MPI.File.Write_at_all_end(self, buf, Status status=None)
    

    集合操作

    在 MPI 3.1 中新增加了真正意义上的非阻塞集合 I/O 操作方法。这些方法的引入旨在最终替换掉上面介绍的分步集合 I/O 操作方法,因为它们克服了分步集合 I/O 的相关限制,比如说其允许在同一个文件句柄上的多个不同的非阻塞集合 I/O 操作重叠在一起,类似于下图所示。因此在支持这些方法的 MPI 环境下,应优先使用这些方法而不是分步集合 I/O 相关方法。

    重叠非阻塞集合 I/O 操作

    非阻塞独立文件指针读/写

    MPI.File.Iread_all(self, buf)
    
    MPI.File.Iwrite_all(self, buf)
    

    非阻塞显式偏移地址

    MPI.File.Iread_at_all(self, Offset offset, buf)
    
    MPI.File.Iwrite_at_all(self, Offset offset, buf)
    

    使用共享文件指针的非阻塞读/写操作

    共享文件指针相关操作将在下一篇中介绍。

    例程

    下面给出使用例程。

    # array_io.py
    
    """
    Demonstrates the usage of nonblocking I/O.
    
    Run this with 6 processes like:
    $ mpiexec -n 6 python array_io.py
    """
    
    import numpy as np
    from mpi4py import MPI
    
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # create a p x q Cartesian process grid
    p, q = 2, 3
    cart_comm = comm.Create_cart([p, q])
    # get the row and column coordinate of each process in the process grid
    ri, ci = cart_comm.Get_coords(rank)
    
    # the global array
    m, n = 10, 12
    global_ary = np.arange(m*n, dtype='i').reshape(m, n)
    rs, re = (m/p)*ri, (m/p)*(ri+1) # start and end of row
    cs, ce = (n/q)*ci, (n/q)*(ci+1) # start and end of column
    # local array of each process
    local_ary = np.ascontiguousarray(global_ary[rs:re, cs:ce])
    print 'rank %d has local_ary with shape %s' % (rank, local_ary.shape)
    
    filename = 'temp.txt'
    
    # the etype
    etype = MPI.INT
    
    # construct filetype
    gsizes = [m, n] # global shape of the array
    distribs = [MPI.DISTRIBUTE_BLOCK, MPI.DISTRIBUTE_BLOCK] # block distribution in both dimensions
    dargs = [MPI.DISTRIBUTE_DFLT_DARG, MPI.DISTRIBUTE_DFLT_DARG] # default distribution args
    psizes = [p, q] # process grid in C order
    filetype = MPI.INT.Create_darray(p*q, rank, gsizes, distribs, dargs, psizes)
    filetype.Commit()
    
    # -------------------------------------------------------------------------------
    # use collective I/O or non-collective I/O if collective I/O is not implemented
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    try:
        # collectively write the data to file
        req = fh.Iwrite_all(local_ary)
    except NotImplementedError:
        print 'Iwrite_all not implemented, use Iwrite instead'
        # non-collectively write the data to file
        req = fh.Iwrite(local_ary)
    
    # do some computatin or communication here during the nonblocking I/O operation
    cnt = 0
    while(not req.Test()):
        cnt += 1
    print 'rank %d has cnt = %d' % (rank, cnt)
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(m * n, dtype='i').reshape(m, n)
        req = fh.Iread_at(0, buf)
        req.Wait()
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    
    
    # -------------------------------------------------------------------------------
    # use split collective I/O
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # begin the split collective write
    fh.Write_all_begin(local_ary)
    
    # do some computatin or communication here during the nonblocking I/O operation
    for i in range(10):
        pass
    
    # end the split collective write
    fh.Write_all_end(local_ary)
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(m * n, dtype='i').reshape(m, n)
        req = fh.Iread_at(0, buf)
        req.Wait()
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    

    运行结果如下:

    $ mpiexec -n 6 python nonblocking_io.py
    rank 2 has local_ary with shape (5, 4)
    rank 3 has local_ary with shape (5, 4)
    rank 4 has local_ary with shape (5, 4)
    rank 5 has local_ary with shape (5, 4)
    rank 0 has local_ary with shape (5, 4)
    rank 1 has local_ary with shape (5, 4)
    rank 5 has cnt = 679
    rank 0 has cnt = 578
    rank 2 has cnt = 833
    rank 3 has cnt = 1988
    rank 4 has cnt = 941
    rank 1 has cnt = 1199
    

    以上介绍了 mpi4py 中的非阻塞 I/O 操作,在下一篇中我们将介绍 mpi4py 中的共享文件指针 I/O 操作。

    相关文章

      网友评论

        本文标题:mpi4py 中的非阻塞 I/O 操作

        本文链接:https://www.haomeiwen.com/subject/fcibvftx.html