先导入一下用到的包
from multiprocessing import Pool, cpu_count, Process, Queue
在不使用进程池的情况下,要想利用到多核运算,我们可能会写下如下代码
tasks = Queue()
# Put tasks into task queue
# ...
def task_func():
# task function
pass
pool = []
for _ in range(cpu_count()):
p = Process(target=task_func)
p.start()
pool.append(p)
for p in pool:
p.join()
使用进程池后,我们可以使用下面的代码完成并行计算
tasks = [(1, 1), (2, 3), (4, 7), (9, 10), (11, 23)]
def add(tup):
return tup[0] + tup[1]
if __name__ == '__main__':
pool = Pool(cpu_count())
ans = pool.map(add, tasks)
print(ans)
Pool
类构造了一个进程池,Pool.map方法会自动的从池中抽取进程完成运算,并入池。这样,就隐藏了实现的细节,使我们不必在业务逻辑中掺入进程操作的代码。
一些坑
-
task function不能使用lambda表达式,下面的代码
if __name__ == '__main__': pool = Pool(cpu_count()) ans = pool.map(lambda tup: tup[0] + tup[1], tasks) pool.close() pool.join()
会报错。
_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001FFC5823E18>: attribute lookup <lambda> on __main__ failed
-
池的定义必须放到函数中而不是全局中,下面的代码
tasks = [(1, 1), (2, 3), (4, 7), (9, 10), (11, 23)] pool = Pool(cpu_count()) def add(tup): return tup[0] + tup[1] if __name__ == '__main__': ans = pool.map(add, tasks) print(ans)
会报运行时错误
网友评论