美文网首页Python 并行计算
mpi4py 中读/写文件中数组的方法

mpi4py 中读/写文件中数组的方法

作者: 自可乐 | 来源:发表于2018-08-03 21:21 被阅读126次

    上一篇中我们介绍了 mpi4py 中的不连续读/写和集合 I/O 操作,下面我们将介绍 mpi4py 中读/写文件中数组的方法。

    在并行科学计算程序中,经常会涉及到读/写文件中的数组(包括子数组和分布式数组,数组可以是多维的,规则分布或不规则分布的)。MPI 提供了相应的方法使这种类型的操作方便而高效。

    在并行应用中,数组一般会按照某种方式分布在多个进程中,而程序需要将这些分布在各个进程中的数组按照整体的行优先顺序(C 数组的排列方式)或列优先顺序(Fortran 数组的排列方式)写入到文件中,或者从文件中将一个整体的数组读取并分布到各个进程上。比如像下图所示,一个两维的 m 行 n 列的数组分布在一个 2 × 3 的进程网格上。

    m × n 数组分布在 2 × 3 的进程网格上

    如果要将这些进程中的数据按照行优先的排列方法整体地写入到一个文件中,则可以看出某个进程本地的子数组并不是连续地位于文件中的某块区域,因此我们必须执行一种不连续的数据写过程,我们可以利用上一篇所介绍的相关方法。因为类似这样的数据读/写操作是如此的普遍,而要自己创建描述这种操作的数据类型又是麻烦而且容易出错的,因此 MPI 提供了相应的数据描述方法供我们方便地使用。利用这些数据描述方法并结合集合 I/O 操作,往往可以允许我们通过仅仅一次读/写调用完成对这个分布式数组的读/写操作,并且 MPI 实现可能提供高的优化性能,尽管执行的是非连续的读/写。下面对执行数据描述方法做简要的介绍,用这些数据描述方法创建的数据类型可以作为文件视图的 filetype 以实现对数组的方便而高效的读/写操作。

    分布式数组

    分布式数组数据描述方法是一种方便易用的描述一个线性化的规则多维数组中一个子数组位置的派生数据类型创建方法,其方法接口如下:

    MPI.Datatype.Create_darray(self, int size, int rank, gsizes, distribs, dargs, psizes, int order=ORDER_C)
    

    该方法在前面数据类型创建方法中作过相应的介绍,这里不再赘述,只强调几点:在创建分布式数组派生数据类型时,进程网格总是假定与整体数组有着相同数目的维数,如果整体数组在某个维度上不分布,进程网格也不能省略掉该维度,而必须设置该维度上的进程数为 1。比如说,一个 10 × 10 的整体数组分布在 4 个进程上,则这 4 个进程可以排成 2 × 2 的进程网格,1 × 4 进程网格,或是 4 × 1 进程网格。进程网格必须总是按照行优先的顺序(即 C 数组排列顺序)。如果程序中要使用一种不同的进程网格排列顺序,则不能使用该数据类型创建方法,可以考虑下面介绍的子数组类型创建方法或其它的派生数据类型创建方法。

    子数组

    子数组数据类型描述方法是另一种描述一个线性化的规则多维数组中一个子数组位置的派生数据类型创建方法,其方法接口如下:

    MPI.Datatype.Create_subarray(self, sizes, subsizes, starts, int order=ORDER_C)
    

    该方法在前面数据类型创建方法中作过相应的介绍,这里不再赘述,只强调几点:子数组数据类型创建方法只能描述块状分布的子数组,而不能像上面介绍的分布式数组创建方法那样描述循环分布及更普遍的块状循环分布。另外,子数组数据类型创建方法对进程的拓扑顺序没有要求,可以是列优先的顺序,还可以是其它任何排列顺序。一般为了方便,可以使用虚拟进程拓扑方法首先按照某种虚拟拓扑结构安排好相应的进程。

    子数组数据类型描述方法也能很好地用来描述一类带有 ghost area 的分布式数组。在一些应用中,分布于某个进程的本地数组在一些维度上会包含几个额外的行或列,这些额外的区域,并不是该进程的本地数组的实际部分,通常被称做 halo 或者 ghost area。这些 ghost area 一般是用来存储邻近进程的行或列以利于该进程和邻近进程之间的通信及便于对本地数组的相关操作。下图给出了一个带有 ghost area 的本地数组的例子。这个本地数组实际上只有 100 行 100 列,但是在其外围包裹了 4 行或 4 列的 ghost area,使其变成了一个 108 行 108 列的数组。可以看出,在这个带有 ghost area 的本地数组中,处于中心部位的实际数组在内存中的排布也是不连续的。另外,分布在各个进程中的数组作为一个整体如果要写入到文件或从文件读入各个进程,也会涉及不连续的读/写过程。在这种情况下,数据的 I/O 操作在内存和文件中都是不连续的,但是借助子数组数据类型及集合 I/O 读写方法,我们依然可以通过一次读/写调用完成相应的操作。

    带有 ghost area 的本地数组

    不规则分布式数组

    MPI 也提供了方法来读/写不规则的分布式数组,只要使用合适的 filetype 来设置文件视图即可。如果和集合 I/O 方法结合起来使用,MPI 实现也可能以高的性能完成对这类数据的读/写操作,虽然这类读/写操作一般认为是很难优化的。不规则的分布是指不能很好地用简单的数学表达的分布形式,不像一般的块状或者循环分布那样有规律性。对这类操作,我们需要使用一种类型映射图方式来描述进程本地数组中的每一个元素与整体数组元素之间的映射关系。下图中就给出了这种类型映射图的一个例子,映射图中的每一个元素指定本地数组中位于该处的元素对应文件中的元素的位置,比如说,0 号进程本地数组的第 0,1,2,3 个元素分别对应文件中的第 0,3,8,11 个元素,1 号进程本地数组的第 0,1,2,3 个元素分别对应文件中的第 2,4,7,13 个元素,等等。

    类型映射图

    这是一种比上面介绍的分布式数组和子数组数据描述方法更加通用的数据类型描述方法。不过使用这种数据描述方法需要注意的是,MPI 标准规定每个进程用来设置文件视图的 filetype 只能描述文件中单调非减的偏移位置。但是对内存中的数据类型描述则没有此限制。因此在一些情况下,我们可能需要将对应内存中的数据描述做适当的调整和重新排列,以保持设置文件视图的 filetype 中的偏移位置是单调非减的。创建这种对不规则分布式数组描述的数据类型的相关方法接口如下:

    MPI.Datatype.Create_indexed(self, blocklengths, displacements)
    
    MPI.Datatype.Create_hindexed(self, blocklengths, displacements)
    
    MPI.Datatype.Create_indexed_block(self, int blocklength, displacements)
    
    MPI.Datatype.Create_hindexed_block(self, int blocklength, displacements)
    

    这些方法在前面数据类型创建方法中作过相应的介绍,这里不再赘述。

    例程

    下面给出使用例程。

    # array_io.py
    
    """
    Demonstrates how to access arrays stored in file.
    
    Run this with 6 processes like:
    $ mpiexec -n 6 python array_io.py
    """
    
    import numpy as np
    from mpi4py import MPI
    
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # create a 2 x 3 Cartesian process grid
    cart_comm = comm.Create_cart([2, 3])
    # get the row and column coordinate of each process in the process grid
    ri, ci = cart_comm.Get_coords(rank)
    
    # the global array
    global_ary = np.arange(10*12, dtype='i').reshape(10, 12)
    rs, re = 5*ri, 5*(ri+1) # start and end of row
    cs, ce = 4*ci, 4*(ci+1) # start and end of column
    # local array of each process
    local_ary = np.ascontiguousarray(global_ary[rs:re, cs:ce])
    print 'rank %d has local_ary with shape %s' % (rank, local_ary.shape)
    
    filename = 'temp.txt'
    
    # -------------------------------------------------------------------------------
    # use darray type
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # the etype
    etype = MPI.INT
    
    # construct filetype
    gsizes = [10, 12] # global shape of the array
    distribs = [MPI.DISTRIBUTE_BLOCK, MPI.DISTRIBUTE_BLOCK] # block distribution in both dimensions
    dargs = [MPI.DISTRIBUTE_DFLT_DARG, MPI.DISTRIBUTE_DFLT_DARG] # default distribution args
    psizes = [2, 3] # process grid in C order
    filetype = MPI.INT.Create_darray(6, rank, gsizes, distribs, dargs, psizes)
    filetype.Commit()
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # collectively write the data to file
    fh.Write_all(local_ary)
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(10 * 12, dtype='i').reshape(10, 12)
        fh.Read_at(0, buf)
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    
    
    # -------------------------------------------------------------------------------
    # use subarray type
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # the etype
    etype = MPI.INT
    
    # construct filetype
    gsizes = [10, 12] # global shape of the array
    subsize = [5, 4] # shape of local subarray
    starts = [rs, cs] # global indices of the first element of the local array
    filetype = MPI.INT.Create_subarray(gsizes, subsize, starts)
    filetype.Commit()
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # collectively write the data to file
    fh.Write_all(local_ary)
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(10 * 12, dtype='i').reshape(10, 12)
        fh.Read_at(0, buf)
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    
    
    # -------------------------------------------------------------------------------
    # use subarray type to access local array with ghost area
    
    # create local array with one row and one column ghost area outside
    local_ghost = np.zeros((7, 6), dtype='i')
    local_ghost[1:6, 1:5] = local_ary # put local_ary in the center of local_ghost
    # here you can fill in the ghost area, but data in ghost area will not be writen
    # to file, so we omit it here...
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # the etype
    etype = MPI.INT
    
    # construct filetype
    gsizes = [10, 12] # global shape of the array
    subsize = [5, 4] # shape of local subarray
    starts = [rs, cs] # global indices of the first element of the local array
    filetype = MPI.INT.Create_subarray(gsizes, subsize, starts)
    filetype.Commit()
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # create a subarray type to describe the data located in local_phost without ghost area
    memsize = local_ghost.shape
    subsize = local_ary.shape
    starts  = [1, 1]
    memtype = MPI.INT.Create_subarray(memsize, subsize, starts)
    memtype.Commit()
    
    # collectively write the actual data inside local_ghost to file
    fh.Write_all([local_ghost, 1, memtype])
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(10 * 12, dtype='i').reshape(10, 12)
        fh.Read_at(0, buf)
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    
    
    # -------------------------------------------------------------------------------
    # use map array to access irregularly distributed array
    
    global_ary = np.arange(100, 124, dtype='i')
    index_ary = np.arange(4*6, dtype='i')
    # permutate the index array
    if rank == 0:
        rand_index = np.random.permutation(index_ary)
    else:
        rand_index = None
    rand_index = comm.bcast(rand_index, root=0)
    map_ary = np.sort(rand_index[4*rank:4*(rank+1)]) # map array should be nondecreasing
    local_ary = global_ary[map_ary]
    if rank == 0:
        print 'global_ary: %s' % global_ary
    print 'rank %d has local_ary: %s, map_ary: %s' % (rank, local_ary, map_ary)
    
    # open the file for read and write, create it if it does not exist,
    # and delete it on close
    fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_RDWR | MPI.MODE_DELETE_ON_CLOSE)
    
    # the etype
    etype = MPI.INT
    
    # construct filetype
    filetype = MPI.INT.Create_indexed_block(1, displacements=map_ary)
    filetype.Commit()
    
    # set the file view
    fh.Set_view(0, etype, filetype)
    
    # collectively write the data to file
    fh.Write_all(local_ary)
    
    # reset file view
    fh.Set_view(0, etype, etype)
    
    # check what's in the file
    if rank == 0:
        buf = np.zeros(24, dtype='i')
        fh.Read_at(0, buf)
        assert np.allclose(buf, global_ary)
    
    # close the file
    fh.Close()
    

    运行结果如下:

    $ mpiexec -n 6 python array_io.py
    rank 0 has local_ary with shape (5, 4)
    rank 1 has local_ary with shape (5, 4)
    rank 2 has local_ary with shape (5, 4)
    rank 3 has local_ary with shape (5, 4)
    rank 4 has local_ary with shape (5, 4)
    rank 5 has local_ary with shape (5, 4)
    rank 4 has local_ary: [100 107 113 118], map_ary: [ 0  7 13 18]
    rank 5 has local_ary: [103 111 116 117], map_ary: [ 3 11 16 17]
    global_ary: [100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    118 119 120 121 122 123]
    rank 0 has local_ary: [108 109 110 120], map_ary: [ 8  9 10 20]
    rank 1 has local_ary: [102 104 114 115], map_ary: [ 2  4 14 15]
    rank 2 has local_ary: [105 106 119 121], map_ary: [ 5  6 19 21]
    rank 3 has local_ary: [101 112 122 123], map_ary: [ 1 12 22 23]
    

    以上介绍了 mpi4py 中读/写文件中数组的方法,在下一篇中我们将介绍 mpi4py 中的非阻塞 I/O 操作。

    相关文章

      网友评论

        本文标题:mpi4py 中读/写文件中数组的方法

        本文链接:https://www.haomeiwen.com/subject/mjnuvftx.html