主要考量Python多进程在不同应用场景下的不同实践,不涉及Queue、Lock、Event等概念。
以下代码均在linux下测试。由于Python未在windows下实现os.fork(),多进程在Unix系统下才能获得最佳体验。
创建一个进程
使用Process类可以创建fork进程并管理。一般在有限几个子进程任务时这样处理。
import time
from multiprocessing import Process
def foo(a,b):
time.sleep(1)
print(a+b)
# 创建进程,指定函数和参数
p = Process(target=foo, args=(1,2))
# 设置为守护进程(守护进程会随主进程退出而退出)
p.daemon = True
# 启动进程
p.start()
# 子进程阻塞主进程,执行完毕后主进程继续
p.join()
print('进程执行完毕')
创建多个进程
params_list = [(1,2), (2,3), (3,4)]
ps = []
for params in params_list:
p = Process(target=foo, args=(1,2))
p.daemon = True
ps.append(p)
p.start()
for p in ps:
# 实际上,对ps的遍历也会被p.join()阻塞。但最终执行完毕的时机是一样的
p.join()
print('多个进程执行完毕')
利用Pool管控多进程
Pool类提供了进程池,可以轻松的面对大量任务分布到有限进程的问题。
from multiprocessing import Pool, cpu_count
# 创建进程池,最大进程数为cpu逻辑核心数
pool = Pool(cpu_count())
results = []
for params in params_list:
# 异步添加任务
result = pool.apply_async(foo, params)
results.append(result)
# 关闭进程池接收
pool.close()
# 阻塞主进程
pool.join()
print('多个进程执行完毕')
# 获取结果
for result in results:
print(result.get())
利用Pool.imap()迭代管控多进程
这种方式利用了Python迭代特性。
在面对海量数据源和巨大计算量时,既利用了多核性能,又避免在内存中读入过多数据。
# 迭代从mongodb查询数据并处理,利用多进程提高计算效率,同时保证内存中仅存在有限几条数据
from multiprocessing import Pool, cpu_count
from pymongo import MongoClient
# 连接mongodb,获取集合testdb.test
connection = MongoClient()
db = connection.testdb
collection = db.test
# 数据处理任务
def deal_doc(doc):
# 处理文档,如利用文档进行复杂计算
result = 1
return result
# 创建进程池,最大进程数为cpu逻辑核心数
pool = Pool(cpu_count())
# Pool.imap()接收可迭代对象为参数,迭代入进程池处理。
# collection.find() 返回mongodb查询游标,可迭代
iter = pool.imap(deal_doc, collection.find())
# 迭代iter,获取计算结果。每迭代一次,利用一个线程,进行一次任务
for result in iter:
print(result)
网友评论