美文网首页Python 并行计算
mpi4py 中的共享文件指针 I/O 操作

mpi4py 中的共享文件指针 I/O 操作

作者: 自可乐 | 来源:发表于2018-08-06 16:05 被阅读103次

    上一篇中我们介绍了 mpi4py 中的非阻塞 I/O 操作,下面我们将介绍 mpi4py 中的共享文件指针 I/O 操作。

    共享文件指针

    每个通过 MPI.File.Open 打开的文件,除了每个进程所拥有的独立文件指针之外,还存在一个全局唯一的为每个进程所共享的共享文件指针。共享文件指针也是以相对于进程当前文件视图的相对位置计算,并且共享文件指针和每个进程的独立文件指针是相互独立,互不影响的。使用共享文件指针 I/O 操作方法要求所有的进程必须使用相同的文件视图,每个进程对共享文件指针读/写方法的一次调用都会将共享文件指针从当前位置移动到加上所读/写数据量的新位置,即各个进程对共享文件指针读/写方法的多次调用的实际效果相当于将这些进程的每次调用按一定的顺序串行化后执行的累加,但对非集合方法调用,这种串行化的顺序是不确定的,如果需要保证某种确定的顺序,则应用程序需使用其他机制,如同步操作等,来保证串行顺序的确定性。共享文件指针的集合操作(包括分步集合操作)访问文件顺序默认为按照进程 rank 从小到大顺序排列,排在后面的进程使用排在前面的进程更新过的共享文件指针访问数据。为防止相同进程再次发起共享文件指针操作打乱当前的执行,MPI 要求只有参加集合操作的所有进程都返回后才可发起下一个共享文件指针操作。这个规定只是从语义上保证共享文件指针操作的顺序性。实际上,具体实现时可以根据进程编号和调用参数计算出每个进程要访问的文件位置,从而可在底层调用显式偏移地址或独立文件指针 I/O 方法使之依然高效地并行执行。

    共享文件指针定位操作

    打开文件的进程组可以显式地通过 MPI.File.Seek_shared 来移动共享文件指针,其方法接口如下:

    MPI.File.Seek_shared(self, Offset offset, int whence=SEEK_SET)
    

    移动共享文件指针到指定偏移位置 offset。该方法是一个集合操作,根据 whence 参数更新共享文件指针。whence 的可能取值如下:

    • MPI.SEEK_SET:将文件指针设置为指向 offset 参数设置的位置;
    • MPI.SEEK_CUR:将文件指针设置为指向当前指针加上 offset 参数值之后的位置;
    • MPI.SEEK_END:将文件指针设置为指向文件末尾再加上 offset 参数值之后的位置。
    MPI.File.Get_position_shared(self)
    

    返回共享文件指针相对当前文件视图的位置(以 etype 为单位)。

    使用共享文件指针的读/写操作方法接口如下:

    阻塞共享文件指针 I/O 操作

    非集合操作

    MPI.File.Read_shared(self, buf, Status status=None)
    
    MPI.File.Write_shared(self, buf, Status status=None)
    

    集合操作

    使用共享文件指针的阻塞集合操作被命名为 xxxx_ordered 是因为其访问文件的顺序默认是按照进程 rank 从小到大的顺序排列的。

    MPI.File.Read_ordered(self, buf, Status status=None)
    
    MPI.File.Write_ordered(self, buf, Status status=None)
    

    注意:对阻塞共享文件指针 I/O,没有对应的分步集合操作。

    非阻塞共享文件指针 I/O 操作

    非集合操作

    非集合的非阻塞共享文件指针操作调用完后会立即返回一个 MPI.Request 对象,可以使用其 Test,Wait 等方法来检测或等待其完成。

    MPI.File.Iread_shared(self, buf)
    
    MPI.File.Iwrite_shared(self, buf)
    

    分步集合操作

    分步集合的非阻塞共享文件指针操作是一种带有一定限制性的非阻塞 I/O 操作,其限制条件及其使用注意事项基本同上一篇中介绍过的使用独立文件指针和显式偏移地址的分步集合操作。

    MPI.File.Read_ordered_begin(self, buf)
    
    MPI.File.Read_ordered_end(self, buf, Status status=None)
    
    MPI.File.Write_ordered_begin(self, buf)
    
    MPI.File.Write_ordered_end(self, buf, Status status=None)
    

    注意:对非阻塞共享文件指针 I/O,没有对应的真正意义上的集合操作,只有分步集合操作。

    例程

    下面给出使用例程。

    # shared_io.py
    
    """
    Demonstrates the usage of I/O with shared file pointers.
    
    Run this with 4 processes like:
    $ mpiexec -n 4 python shared_io.py
    """
    
    import numpy as np
    from mpi4py import MPI
    
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    buf = np.full((5,), rank, dtype='i')
    
    filename = 'temp.txt'
    
    # the etype
    etype = MPI.INT
    filetype = MPI.INT
    
    # -------------------------------------------------------------------------------
    # use blocking non-collective shared file pointer I/O
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # each process writes buf to file by using blocking non-collective
    # shared file pointer write
    # there is usually no oreder of the write in this case
    print 'rank %d writes %s to file' % (rank, buf)
    fh.Write_shared(buf)
    
    # synchronize here to make sure all processes have done the write
    comm.barrier()
    
    # reset the shared file pointer
    fh.Seek_shared(0)
    
    # check what's in the file
    if rank == 0:
        buf1 = np.zeros(5 * size, dtype='i')
        fh.Read_shared(buf1)
        # fh.Read(buf1)
        # fh.Read_at(0, buf1)
        print 'data in the file with Write_shared: %s' % buf1
    
    # close the file
    fh.Close()
    
    
    # # -------------------------------------------------------------------------------
    # use blocking collective shared file pointer I/O
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # each process writes buf to file by using blocking collective
    # shared file pointer write
    # data will be writen in the order of the rank in this case
    print 'rank %d writes %s to file' % (rank, buf)
    fh.Write_ordered(buf)
    
    # no need barrier synchronizition when use collective write
    # comm.barrier()
    
    # reset the shared file pointer
    fh.Seek_shared(0)
    
    # check what's in the file
    if rank == 0:
        buf1 = np.zeros(5 * size, dtype='i')
        fh.Read_shared(buf1)
        # fh.Read(buf1)
        # fh.Read_at(0, buf1)
        print 'data in the file with Write_ordered: %s' % buf1
    
    # close the file
    fh.Close()
    
    # -------------------------------------------------------------------------------
    # use nonblocking split collective shared file pointer I/O
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # each process writes buf to file by using nonblocking split collective
    # shared file pointer write
    # data will be writen in the order of the rank in this case
    print 'rank %d writes %s to file' % (rank, buf)
    fh.Write_ordered_begin(buf)
    
    # can do some computatin/communication here
    for i in range(10):
        pass
    
    fh.Write_ordered_end(buf)
    
    # no need barrier synchronizition when use collective write
    # comm.barrier()
    
    # reset the shared file pointer
    fh.Seek_shared(0)
    
    # check what's in the file
    if rank == 0:
        buf1 = np.zeros(5 * size, dtype='i')
        fh.Read_shared(buf1)
        # fh.Read(buf1)
        # fh.Read_at(0, buf1)
        print 'data in the file with Write_ordered_begin and Write_ordered_end: %s' % buf1
    
    # reset the shared file pointer
    fh.Seek_shared(0)
    
    # check with Read_ordered_begin and Read_ordered_end
    buf2 = np.zeros_like(buf)
    fh.Read_ordered_begin(buf2)
    fh.Read_ordered_end(buf2)
    assert np.allclose(buf, buf2)
    
    # close the file
    fh.Close()
    

    运行结果如下:

    $ mpiexec -n 4 python shared_io.py
    rank 1 writes [1 1 1 1 1] to file
    rank 2 writes [2 2 2 2 2] to file
    rank 3 writes [3 3 3 3 3] to file
    rank 0 writes [0 0 0 0 0] to file
    data in the file with Write_shared: [3 3 3 3 3 2 2 2 2 2 0 0 0 0 0 1 1 1 1 1]
    rank 0 writes [0 0 0 0 0] to file
    rank 2 writes [2 2 2 2 2] to file
    rank 3 writes [3 3 3 3 3] to file
    rank 1 writes [1 1 1 1 1] to file
    data in the file with Write_ordered: [0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3]
    rank 2 writes [2 2 2 2 2] to file
    rank 0 writes [0 0 0 0 0] to file
    rank 1 writes [1 1 1 1 1] to file
    rank 3 writes [3 3 3 3 3] to file
    data in the file with Write_ordered_begin and Write_ordered_end: [0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3]
    

    以上介绍了 mpi4py 中的共享文件指针 I/O 操作,在下一篇中我们将介绍 mpi4py 中 I/O 相关的 hints。

    相关文章

      网友评论

        本文标题:mpi4py 中的共享文件指针 I/O 操作

        本文链接:https://www.haomeiwen.com/subject/kjdgvftx.html