虽然Python的多处理库已经成功地用于广泛的应用程序中,但是在本文中,我们发现它不适合一些重要的应用程序类,包括数值数据处理、状态计算和具有昂贵初始化的计算。主要有两个原因:
Ray是一个快速、简单的框架,用于构建和运行解决这些问题的分布式应用程序。有关一些基本概念的介绍,请参阅本文。Ray利用Apache Arrow进行高效的数据处理,并为分布式计算提供任务和参与者抽象。
Joblib是一组在Python中提供轻量级管道的工具,Joblib特别针对大数据进行了快速和健壮的优化,并对numpy数组进行了特定的优化。
1.字符串并行处理
这里引入ray,joblib和multiprocessing Pool, 默认设定为8核运行
# 测试并行性能
import ray
ray.init(num_cpus=8)
import joblib
from multiprocessing import Pool
import pandas as pd
import numpy as np
接下来,我们生成一对长度为80w的字符串数据, 统计相同位置字符的一致率。并统计三个包的并行,首先我们进行16次对比,都使用8核处理。
def compare_string(args):
string_1, string_2 = args
same = 0
for i in range(len(string_1)):
if string_1[i] == string_2[i]:
same += 1
return same
# ray版本的字符串对比, 只是加了一个修饰器
@ray.remote
def compare_string2(args):
string_1, string_2 = args
same = 0
for i in range(len(string_1)):
if string_1[i] == string_2[i]:
same += 1
return same
string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(16)] # 重复16次
# 把multiprocessing 测试包在一个函数中
def test_pool(func, args):
pool = Pool(8)
ret = pool.map(func, args)
pool.close()
pool.join()
return ret
# 测试multiprocessing pool
%timeit ret = test_pool(compare_string, args)
# 测试joblib Parallel 的loky模式(默认模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]
结果如下:
# multiprocessing pool平均时间
1.21 s ± 82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
42.9 s ± 418 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
1.55 s ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
835 ms ± 27.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
字符串时间对比
joblib loky模式直接pass掉,用时太长了,这里字符串对比一共有16次,我们增加10倍看一下其它三组的排名是否还是一致。
string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(160)] # 重复160次
# 测试multiprocessing pool
%timeit ret = test_pool(compare_string, args)
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]
# multiprocessing pool平均时间
2.92 s ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
10.6 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
7.92 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
字符串时间对比2
当重复次数变多,并行数(8核)不变时,python原生multiprocessing pool反而更快,因此如果是非数值计算,字符串统计还是建议使用python原生multiprocessing
2.数字并行处理
第二组对比我们进行纯数字计算对比,这里我们测试计算斐波那契数列并行用时
def fib_loop(n):
a, b = 0, 1
for i in range(n + 1):
a, b = b, a + b
return a
# ray 版本
@ray.remote
def fib_loop2(n):
a, b = 0, 1
for i in range(n + 1):
a, b = b, a + b
return a
args = [10000]*1600 # 重复计算1600次,每次计算n=10000的斐波那契数列
# 测试multiprocessing pool
%timeit ret = test_pool(fib_loop, args)
# 测试joblib Parallel 的loky模式(默认模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 测试joblib Parallel 的multiprocessing模式(多进程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 测试ray框架并行
%timeit ret = [fib_loop2.remote(arg) for arg in args]
# multiprocessing pool平均时间
742 ms ± 46.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
782 ms ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
608 ms ± 21.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均时间
873 ms ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
数值计算时间对比
可以看出 joblib 和 ray框架都对数值计算进行了优化,joblib的multiprocessing最快,起始这里时间的差异更多的应该是进程通信的误差
3.矩阵运算
上面只是使用了简单的加法操作,这里使用scipy的矩阵运算,看看三种框架对矩阵运算的优化情况
import scipy.signal as s
def scipy_convolve2d(args):
image, random_filter = args
return s.convolve2d(image, random_filter)[::5, ::5]
# ray版本
@ray.remote
def scipy_convolve2d2(args):
image, random_filter = args
return s.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(8)]
# 并行参数直接打包为args列表
args = [(np.zeros((4000, 4000)), filters[i]) for i in range(8)]
# multiprocessing pool平均时间
%timeit ret = test_pool(scipy_convolve2d, args)
# joblib Parallel 的loky模式 平均时间
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
# joblib Parallel 的multiprocessing模式 平均时间
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
#ray平均时间
%timeit ret = ray.get([scipy_convolve2d2.remote(arg) for arg in args])
# multiprocessing pool平均时间
3.36 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均时间
1.9 s ± 64.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均时间
1.38 s ± 45.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#ray平均时间
1.31 s ± 53.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
多进程矩阵运算平均用时
可以看出ray相对来说是最快的
4.共享内存
joblib和ray相较于原始python多进程的优势的另一个方面就是对内存的优化,对于一个较大的数据,我们只想要其中的一部分,joblib和ray都可以使用共享内存完成相应部分的计算,而不是每一个进程都放入一份独立完整的数据。
我们使用ray和joblib,共享一份pd.DataFrame,并统计每列所有类型的出现次数
# 生成一份大内存的pd.DataFrame
zeros = np.zeros((10000,1000))
ones = np.ones((10000,1000))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
def value_counts(df, i):
return df.iloc[:,i].value_counts().to_dict()
# ray版本函数
@ray.remote
def value_counts2(df, i):
return df.iloc[:,i].value_counts().to_dict()
# joblib共享内存函数
import os
import tempfile
def memmap(data):
tmp_folder = tempfile.mkdtemp()
tmp_path = tmp_folder + '/joblib.mmap'
if os.path.exists(tmp_path): # 若存在则删除
os.remove(tmp_path)
_ = joblib.dump(data, tmp_path)
memmap_data = joblib.load(tmp_path, mmap_mode='r+')
return memmap_data
shared_df = memmap(df) # joblib 的共享内存方法 shared_df就是共享内存的df
df_id = ray.put(df) # ray共享内存的方法(封装好了更简单一些)
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
# joblib Parallel 的loky模式 跑一次时间
CPU times: user 596 ms, sys: 68 ms, total: 664 ms
Wall time: 1.23 s
# joblib Parallel 的multiprocessing 跑一次时间
CPU times: user 152 ms, sys: 64 ms, total: 216 ms
Wall time: 611 ms
# ray 跑一次时间
CPU times: user 352 ms, sys: 64 ms, total: 416 ms
Wall time: 784 ms
共享内存时间对比
结果还是joblib的multiprocessing模式最快,不过时间差距应该不大
这里df实际上是个数值矩阵,如果将其变为字符串格式,速度会不会下降呢?
我们将df维度降低变为100维,数值类型变为str
# pandas处理文件,统计
zeros = np.zeros((10000,100))
ones = np.ones((10000,100))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
df = df.astype(str) # 转为字符串
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
joblib Parallel 的loky模式 跑一次时间
CPU times: user 57.7 s, sys: 3.33 s, total: 1min 1s
Wall time: 1min 1s
# joblib Parallel 的multiprocessing模式 跑一次时间
CPU times: user 52.9 s, sys: 3.21 s, total: 56.1 s
Wall time: 56.7 s
# ray 跑一次时间
CPU times: user 256 ms, sys: 76 ms, total: 332 ms
Wall time: 4.7 s
共享内存时间测试2
令人吃惊的是将DataFrame int类型转为str类型后,ray框架并行计算时间惊人的减少,很有可能在上次对比中数值类型并不能时间反映出两种框架对共享内存的使用效率。
4.总结
总的来说这三个包在一些小人物中并行时间上面差异并不大
ray和joblib都对数值计算进行了优化
在处理pandas共享数据时ray的优势更明显
网友评论