美文网首页
python并行框架对比

python并行框架对比

作者: AsdilFibrizo | 来源:发表于2020-03-21 18:06 被阅读0次

    虽然Python的多处理库已经成功地用于广泛的应用程序中,但是在本文中,我们发现它不适合一些重要的应用程序类,包括数值数据处理、状态计算和具有昂贵初始化的计算。主要有两个原因:

    1. 处理数字数据效率低下
    2. 无法在单独的“任务”之间共享变量
      本文将比较python原生多任务包multiprocessing,joblib包,以及ray包,在不同环境测试他们的并行性能

    Ray是一个快速、简单的框架,用于构建和运行解决这些问题的分布式应用程序。有关一些基本概念的介绍,请参阅本文。Ray利用Apache Arrow进行高效的数据处理,并为分布式计算提供任务和参与者抽象。
    Joblib是一组在Python中提供轻量级管道的工具,Joblib特别针对大数据进行了快速和健壮的优化,并对numpy数组进行了特定的优化。

    1.字符串并行处理

    这里引入ray,joblib和multiprocessing Pool, 默认设定为8核运行

    # 测试并行性能
    import ray
    ray.init(num_cpus=8)
    import joblib
    from multiprocessing import Pool
    import pandas as pd
    import numpy as np
    

    接下来,我们生成一对长度为80w的字符串数据, 统计相同位置字符的一致率。并统计三个包的并行,首先我们进行16次对比,都使用8核处理。

    def compare_string(args):
        string_1, string_2 = args
        same = 0
        for i in range(len(string_1)):
            if string_1[i] == string_2[i]:
                same += 1
        return same
    # ray版本的字符串对比, 只是加了一个修饰器
    @ray.remote
    def compare_string2(args):
        string_1, string_2 = args
        same = 0
        for i in range(len(string_1)):
            if string_1[i] == string_2[i]:
                same += 1
        return same
    
    string_1 = ['0']*800000
    string_2 = ['0']*800000
    args = [[string_1, string_2] for i in range(16)] # 重复16次
    
    # 把multiprocessing 测试包在一个函数中
    def test_pool(func, args):
        pool = Pool(8)
        ret = pool.map(func, args)
        pool.close()
        pool.join()
        return ret
    # 测试multiprocessing pool
    %timeit ret = test_pool(compare_string, args) 
    # 测试joblib Parallel 的loky模式(默认模式)
    %timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
    # 测试joblib Parallel 的multiprocessing模式(多进程模式)
    %timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
    # 测试ray框架并行
    %timeit ret = [compare_string2.remote(arg) for arg in args]
    

    结果如下:

    # multiprocessing pool平均时间
    1.21 s ± 82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的loky模式 平均时间
    42.9 s ± 418 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的multiprocessing模式 平均时间
    1.55 s ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # ray 平均时间
    835 ms ± 27.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    字符串时间对比

    joblib loky模式直接pass掉,用时太长了,这里字符串对比一共有16次,我们增加10倍看一下其它三组的排名是否还是一致。

    string_1 = ['0']*800000
    string_2 = ['0']*800000
    args = [[string_1, string_2] for i in range(160)] # 重复160次
    # 测试multiprocessing pool
    %timeit ret = test_pool(compare_string, args) 
    # 测试joblib Parallel 的multiprocessing模式(多进程模式)
    %timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
    # 测试ray框架并行
    %timeit ret = [compare_string2.remote(arg) for arg in args]
    
    # multiprocessing pool平均时间
    2.92 s ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的multiprocessing模式 平均时间
    10.6 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # ray 平均时间
    7.92 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    字符串时间对比2
    当重复次数变多,并行数(8核)不变时,python原生multiprocessing pool反而更快,因此如果是非数值计算,字符串统计还是建议使用python原生multiprocessing

    2.数字并行处理

    第二组对比我们进行纯数字计算对比,这里我们测试计算斐波那契数列并行用时

    def fib_loop(n):
        a, b = 0, 1
        for i in range(n + 1):
            a, b = b, a + b
        return a
    # ray 版本
    @ray.remote
    def fib_loop2(n):
        a, b = 0, 1
        for i in range(n + 1):
            a, b = b, a + b
        return a
    
    args = [10000]*1600 # 重复计算1600次,每次计算n=10000的斐波那契数列
    # 测试multiprocessing pool
    %timeit ret = test_pool(fib_loop, args)
    # 测试joblib Parallel 的loky模式(默认模式)
    %timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
    # 测试joblib Parallel 的multiprocessing模式(多进程模式)
    %timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
    # 测试ray框架并行
    %timeit ret = [fib_loop2.remote(arg) for arg in args]
    
    # multiprocessing pool平均时间
    742 ms ± 46.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的loky模式 平均时间
    782 ms ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的multiprocessing模式 平均时间
    608 ms ± 21.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # ray 平均时间
    873 ms ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    数值计算时间对比
    可以看出 joblib 和 ray框架都对数值计算进行了优化,joblib的multiprocessing最快,起始这里时间的差异更多的应该是进程通信的误差

    3.矩阵运算

    上面只是使用了简单的加法操作,这里使用scipy的矩阵运算,看看三种框架对矩阵运算的优化情况

    import scipy.signal as s
    def scipy_convolve2d(args):
        image, random_filter = args
        return s.convolve2d(image, random_filter)[::5, ::5]
    # ray版本
    @ray.remote
    def scipy_convolve2d2(args):
        image, random_filter = args
        return s.convolve2d(image, random_filter)[::5, ::5]
    
    filters = [np.random.normal(size=(4, 4)) for _ in range(8)]
    # 并行参数直接打包为args列表
    args = [(np.zeros((4000, 4000)), filters[i]) for i in range(8)]
    
    # multiprocessing pool平均时间
    %timeit ret = test_pool(scipy_convolve2d, args)
    # joblib Parallel 的loky模式 平均时间
    %timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
    # joblib Parallel 的multiprocessing模式 平均时间
    %timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
    #ray平均时间
    %timeit ret = ray.get([scipy_convolve2d2.remote(arg) for arg in args])
    
    # multiprocessing pool平均时间
    3.36 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的loky模式 平均时间
    1.9 s ± 64.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # joblib Parallel 的multiprocessing模式 平均时间
    1.38 s ± 45.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    #ray平均时间
    1.31 s ± 53.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    多进程矩阵运算平均用时

    可以看出ray相对来说是最快的

    4.共享内存

    joblib和ray相较于原始python多进程的优势的另一个方面就是对内存的优化,对于一个较大的数据,我们只想要其中的一部分,joblib和ray都可以使用共享内存完成相应部分的计算,而不是每一个进程都放入一份独立完整的数据。

    我们使用ray和joblib,共享一份pd.DataFrame,并统计每列所有类型的出现次数
    # 生成一份大内存的pd.DataFrame
    zeros = np.zeros((10000,1000))
    ones = np.ones((10000,1000))
    df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
    
    def value_counts(df, i):
        return df.iloc[:,i].value_counts().to_dict()
    # ray版本函数
    @ray.remote
    def value_counts2(df, i):
        return df.iloc[:,i].value_counts().to_dict()
    
    # joblib共享内存函数
    import os
    import tempfile
    def memmap(data):
        tmp_folder = tempfile.mkdtemp()
        tmp_path = tmp_folder + '/joblib.mmap'
        if os.path.exists(tmp_path):  # 若存在则删除
            os.remove(tmp_path)
        _ = joblib.dump(data, tmp_path)
        memmap_data = joblib.load(tmp_path, mmap_mode='r+')
        return memmap_data
    shared_df = memmap(df) # joblib 的共享内存方法 shared_df就是共享内存的df
    
    df_id = ray.put(df) # ray共享内存的方法(封装好了更简单一些)
    
    # joblib Parallel 的loky模式
    %time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
    # joblib Parallel 的multiprocessing模式
    %time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
    # ray
    %time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
    
    # joblib Parallel 的loky模式 跑一次时间
    CPU times: user 596 ms, sys: 68 ms, total: 664 ms
    Wall time: 1.23 s
    # joblib Parallel 的multiprocessing 跑一次时间
    CPU times: user 152 ms, sys: 64 ms, total: 216 ms
    Wall time: 611 ms
    # ray 跑一次时间
    CPU times: user 352 ms, sys: 64 ms, total: 416 ms
    Wall time: 784 ms
    
    共享内存时间对比
    结果还是joblib的multiprocessing模式最快,不过时间差距应该不大

    这里df实际上是个数值矩阵,如果将其变为字符串格式,速度会不会下降呢?
    我们将df维度降低变为100维,数值类型变为str

    # pandas处理文件,统计
    zeros = np.zeros((10000,100))
    ones = np.ones((10000,100))
    df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
    df = df.astype(str) # 转为字符串
    
    # joblib Parallel 的loky模式
    %time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
    # joblib Parallel 的multiprocessing模式
    %time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
    # ray
    %time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
    
    joblib Parallel 的loky模式 跑一次时间
    CPU times: user 57.7 s, sys: 3.33 s, total: 1min 1s
    Wall time: 1min 1s
    # joblib Parallel 的multiprocessing模式 跑一次时间
    CPU times: user 52.9 s, sys: 3.21 s, total: 56.1 s
    Wall time: 56.7 s
    # ray 跑一次时间
    CPU times: user 256 ms, sys: 76 ms, total: 332 ms
    Wall time: 4.7 s
    
    共享内存时间测试2
    令人吃惊的是将DataFrame int类型转为str类型后,ray框架并行计算时间惊人的减少,很有可能在上次对比中数值类型并不能时间反映出两种框架对共享内存的使用效率。

    4.总结

    总的来说这三个包在一些小人物中并行时间上面差异并不大
    ray和joblib都对数值计算进行了优化
    在处理pandas共享数据时ray的优势更明显

    相关文章

      网友评论

          本文标题:python并行框架对比

          本文链接:https://www.haomeiwen.com/subject/uwayyhtx.html