concurrent.futures库提供了一个ProcessPoolExecutor类,可用来在单独运行的Python解释器实例中执行计算密集型的函数。
下面是一个简单的脚本:
# -*- coding: utf-8 -*-
import time
from concurrent.futures import ProcessPoolExecutor
def sleepy(arg):
print("I'm {}, I'm going to sleep.".format(arg))
time.sleep(6.6)
print("{} wake up!".format(arg))
return str('{} recovery!'.format(arg))
if __name__ == '__main__':
names = ['Jack', 'Anna', 'Tom', 'White', 'Paul']
begin = time.time()
with ProcessPoolExecutor() as pool:
all_man = pool.map(sleepy, names)
end = time.time()
for each in all_man:
print(each)
print('\nAmazing Five wake up, it takes {:.2f} seconds'.format(end - begin))
实际的性能会根据机器上的CPU个数而有所不同。
在底层,ProcessPoolExecutor创建了N个独立运行的Python解释器,这里的N就是在系统上检测到的可用的CPU个数。可以修改创建的Python进程个数,只要给ProcessPoolExexutor(N)提供一个可选的参数即可。进程池会一直运行,直到with语句块中的最后一条语句执行完毕为止,此时进程池就会关闭。但是,程序会一直等待所有已经提交的任务都处理完毕为止。
提交到进程池中的任务必须定义成函数的形式。有两种方法可以提交任务。如果想并行处理一个列表推导式或者map()操作,可以使用pool.map()(如上例)。
还有一种方式就是可以通过pool.submit()方法来手动提交一个单独的任务:
def when_done(r):
print('Got:', r.result())
with ProcessPoolExecutor() as pool1:
for i in range(1,10):
results = pool1.submit(sleepy, i)
results.add_done_callback(when_done) # 提供一个回调函数,让它在任务完成时得到触发执行。
如果手动提交任务,得到的结果就是一个Future实例。要获取到实际的结果还需要调用它的result()方法。直接调用会阻塞进程,直到完成并将结果返回给进程池为止。
与其让进程阻塞,也可以提供一个回调函数,让它在任务完成时得到触发执行。如上例中的 results.add_done_callback(when_done)。
要考虑的几个因素:
- 只适用于可以将问题分解成各个独立部分的情况。
- 任务必须定义成普通的函数来提交。
- 函数的参数和返回值必须可兼容于pickle编码。
- 提交的工作函数不应该维护持久的状态或者带有副作用。
- 进程池是通过调用UNIX上的fork()系统调用来创建的,会克隆出一个Python解释器,在fork()时会包含所有的程序状态。在Windows上,会加载一个独立的解释器拷贝,但并不包含状态。
- 当将进程池和多线程技术结合在一起时,应在创建任何线程之前优先创建并加载进程池。
参考书籍:
Python Cookbook:12.8 Page 521 - 525
PS:
这本书终于快看完了,也不枉这三个月来同事午休的时间我端杯咖啡去公共区啃书。里面有些内容不好啃,也做了标注,回头再继续看看。看了这么多(全书共680页),不能说自己看了就全部都会了,不过也给自己一些启发。第14章-测试、调试以及异常对测试者来说是个重点,unittest.mock()模块书中讲的不多,但很强大,用Python编写单元测试脚本的话,这是个重要点。
千万不要只以为unittest就是组建个测试集,输出测试报告的框架,远比你想象的要强大的多。
哦,对了,这是第3版。
网友评论