需求
串行:preprocessing --> fuc_1 --> fuc_2 --> fuc_3 --> postprocessing
并行:preprocessing --> (fuc_1 fuc_2 fuc_3并行执行) --> postprocessing
实现10最有效率的
实现1
可以满足需求,但效率不高,需要不断地 建立子进程、销毁子进程
import multiprocessing
from multiprocessing import Manager
def worker(procnum, return_dict):
'''worker function'''
print(str(procnum) + ' represent!')
return_dict[procnum] = procnum
if __name__ == '__main__':
manager = Manager()
# return_list = manager.list() 也可以使用列表list
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
实现2
使用进程的队列multiprocessing.Queue,put(),get()方法
子进程不需返回值,将目标结果放入队列中
在主进程中获取 get方法
import random
import time
import multiprocessing
def worker_1(k,q):
t = 0
print("process-", k)
for i in range(10):
x = random.randint(1, 3)
t += x
q.put(t)
def worker_2(k,q):
t = 0
print("process-", k)
for i in range(5):
x = random.randint(1, 3)
t += x
q.put(t)
if __name__ == '__main__':
q = multiprocessing.Queue()
jobs = []
p = multiprocessing.Process(target=worker_1, args=('1', q))
jobs.append(p)
p.start()
p = multiprocessing.Process(target=worker_2, args=('2', q))
jobs.append(p)
p.start()
for p in jobs:
p.join()
results = [q.get() for j in jobs]
print(results)
实现3
使用进程池
from multiprocessing import Pool
import time
def func_1(i):
time.sleep(1)
return i*i
def func_2(i):
time.sleep(2)
return i*i*i
if __name__ == '__main__':
p1 = Pool(5)
p2 = Pool(5)
ret_1 = p1.map(func_1,range(10)) # pool.map() 默认是同步的
ret_2 = p2.map(func_2,range(10))
print(ret_1)
print(ret_2)
实现4
使用pool.apply_async() 异步执行
pool.map_async() 异步执行+列表推导
from multiprocessing import Pool
import time
def func_1(*args):
time.sleep(2)
print(args)
return args[0][0], args[0][1], args[0][2]
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
# 先建立进程池,放在
p1 = Pool(5)
p2 = Pool(5)
if __name__ == '__main__':
start = time.time()
# ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)])
res = p1.apply_async(func_1, ('lim', 2020, 65.8))
# print('jhsjsj')
ret_2 = p2.map_async(func_2, range(5))
ret_2 = p2.map(func_2, range(5))
print(res.get())
# print(ret_1.get())
print(ret_2.get())
print(time.time() - start)
# print(ret_1 + ret_2)
实现5
- 用一个进程池来并行执行不同的函数,一个进程池的开销小一些
- 如果用多个进程池,则进程池之间需要采用异步执行
from multiprocessing import Pool
import time
def func_1(*args):
time.sleep(2)
print(args)
return args[0][0], args[0][1], args[0][2]
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
def func3():
print(3)
def funMain(i):
if i == 0:
func_1('lim', 2020, 65.8)
else:
func3()
p1 = Pool(2)
p2 = Pool(5)
if __name__ == '__main__':
start = time.time()
# ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)])
# 如果用多个进程池,则进程池之间需要采用异步执行
# res = p1.apply_async(func_1, ('lim', 2020, 65.8))
# print('jhsjsj')
# ret_2 = p2.map_async(func_2, range(5))
ret_2 = p2.map(funMain, range(2)) # 用一个进程池来并行执行不同的函数,一个进程池的开销小一些
# print(res.get())
# print(ret_1.get())
print(ret_2)
print(time.time() - start)
# print(ret_1 + ret_2)
实现6
用一个进程池多进程执行多个函数并收集结果
from multiprocessing import Pool # 进程池
import time # 计时
def func_1(*args): # *list 读取参数列表
time.sleep(2)
print(args)
return args[0], args[1], args[2] # 三个参数
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
def func3():
print(5)
time.sleep(5)
return 5
p1 = Pool(2)
if __name__ == '__main__':
start = time.time()
res1 = p1.apply_async(func_1, ('lim', 2020, 65.8))
res2 = p1.apply_async(func_2, [2])
res3 = p1.apply_async(func3,)
print(res1.get())
print(res2.get())
print(res3.get())
print(time.time() - start)
实现7
线程池嵌套进程池
#coding=utf-8
# 线程池 嵌套进程池
from multiprocessing import Pool # 进程池
import time # 计时
from concurrent.futures import ThreadPoolExecutor
def func_1(*args): # *list 读取参数列表
time.sleep(2)
print(args)
return args[0], args[1], args[2] # 三个参数
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
def func3():
print(5)
time.sleep(5)
return 5
def f(txt):
start = time.time()
pool = Pool(3)
res1 = pool.apply_async(func_1, ('lim', 2020, 65.8))
res2 = pool.apply_async(func_2, [2])
res3 = pool.apply_async(func3,)
print(res1.get())
print(res2.get())
print(res3.get())
print(time.time() - start)
return txt
if __name__ == '__main__':
start = time.time()
executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(f, ('hello'))
task2 = executor.submit(f, ('world'))
# done方法用于判定某个任务是否完成
# print(task1.done())
# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
# print(task2.cancel())
# time.sleep(4)
# print(task1.done())
# result方法可以获取task的执行结果
print(task1.result()) # 对线程返回值的收集
print(task2.result())
print(time.time() - start)
实现8
线程池嵌套线程池
#coding=utf-8
# 线程池 嵌套 线程池 multithread-multithread
# 多线程可能是伪的并行,在计算复杂度高的情况下,可能达不到加速的效果
from multiprocessing import Pool # 进程池
import time # 计时
from concurrent.futures import ThreadPoolExecutor
def func_1(*args): # *list 读取参数列表
time.sleep(2)
print(args)
args = args[0]
return args[0], args[1], args[2] # 三个参数
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
def func3():
print(5)
time.sleep(5)
return 5
def f(txt):
start = time.time()
executor = ThreadPoolExecutor(max_workers=3)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(func_1, ('lim', 2020, 65.8))
task2 = executor.submit(func_2, (2))
task3 = executor.submit(func3,)
print(task1.result()) # 对线程返回值的收集
print(task2.result())
print(task3.result())
print(time.time() - start)
return txt
if __name__ == '__main__':
start = time.time()
executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(f, ('hello'))
task2 = executor.submit(f, ('world'))
# done方法用于判定某个任务是否完成
# print(task1.done())
# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
# print(task2.cancel())
# time.sleep(4)
# print(task1.done())
# result方法可以获取task的执行结果
print(task1.result()) # 对线程返回值的收集
print(task2.result())
print(time.time() - start)
实现9
进程池嵌套线程池
可以实现简单的子函数用多线程计算,复杂的函数用多进程计算
#coding=utf-8
#进程池嵌套线程池
# 进程池
from multiprocessing import Pool # 进程池
import time # 计时
from concurrent.futures import ThreadPoolExecutor
def func_1(*args): # *list 读取参数列表
time.sleep(2)
print(args)
args = args[0]
return args[0], args[1], args[2] # 三个参数
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
def func3():
print(5)
time.sleep(5)
return 5
def f(txt):
start = time.time()
executor = ThreadPoolExecutor(max_workers=3)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(func_1, ('lim', 2020, 65.8))
task2 = executor.submit(func_2, (2))
task3 = executor.submit(func3,)
print(task1.result()) # 对线程返回值的收集
print(task2.result())
print(task3.result())
print(time.time() - start)
return txt
if __name__ == '__main__':
start = time.time()
pool = Pool(2)
res1 = pool.apply_async(f, ('hello',))
res2 = pool.apply_async(f, ('world',))
print(res1.get())
print(res2.get())
print(time.time() - start)
实现10
开大进程池的进程个数避免使用多线程,因为python的多线程是伪的多线程,计算密集时起不到加速作用,还是用多进程比较靠谱。
#coding=utf-8
# 只用进程池,增加进程数量
from multiprocessing import Pool # 进程池
import time # 计时
# 子函数1
def func_1(*args): # *list 读取参数列表
time.sleep(2)
# print(args)
# args = args[0]
return args[0], args[1], args[2] # 三个参数
# 子函数2
def func_2(i):
time.sleep(3)
return {'ans': i*i*i}
# 子函数3
def func_3():
print(5)
time.sleep(5)
return 5
# 定义汇总处理逻辑
def f(a,b,c):
return a, b, c
if __name__ == '__main__':
start = time.time()
pool = Pool(6) # 根据需要创建进程池
# 不同子函数的输入列表
input1 = [('lim', 2020, 65.8), ('liuts', 2020, 65.8)]
input2 = [(2,), (3,)]
input3 = [(), ()]
# 异步执行多个子函数来并行处理输入列表
res1 = [pool.apply_async(func_1, e) for e in input1]
res2 = [pool.apply_async(func_2, e) for e in input2]
res3 = [pool.apply_async(func_3, e) for e in input3]
# 结果收集
res1 = [e.get() for e in res1]
res2 = [e.get() for e in res2]
res3 = [e.get() for e in res3]
# # 串行汇总处理
# for a,b,c in zip(res1, res2, res3):
# print(f(a, b, c)) # 单条数据
# 并行汇总处理
final_res = [pool.apply_async(f, (a,b,c)) for a,b,c in zip(res1, res2, res3)]
final_res = [e.get() for e in final_res]
print(final_res)
print(time.time() - start)
网友评论