1、简单例子代码
# -*- coding: utf-8 -*-
import multiprocessing
from multiprocessing import Pool, Queue
# 全局Pool方便在已产生结果数据时中止进程
global MP_POOL
# class的方法须放在top级函数中间接调用
def parallel_test(i):
tt = TT()
return tt.test(i)
# 供并行进程异步回调的函数
def async_callback(d):
global MP_POOL
print(">>>>>>>>>>>>>callback d: {0}<<<<<<<<<<<<<<".format(d))
if d == 13:
print("=============terminate pool=============")
MP_POOL.terminate()
# 被并行调用的class
class TT(object):
def __init__(self):
pass
def test(self, i):
print("----------------in TT.test i: {0}----------------".format(i))
return i
class SimpleParallel(object):
def __init__(self):
self.myqueue = Queue()
def run(self):
self.myqueue.put(10)
self.myqueue.put(15)
self.myqueue.put(13)
self.myqueue.put(20)
self.myqueue.put(21)
self.myqueue.put(30)
self.myqueue.put(11)
self.myqueue.put(19)
self.myqueue.put(25)
try:
self.async_track()
except Exception as e:
pass
def async_track(self):
global MP_POOL
# Pool不指定processes参数,默认也会用cpu_count初始化
MP_POOL = Pool(processes=multiprocessing.cpu_count())
while not self.myqueue.empty():
i = self.myqueue.get()
# apply_async的func参数只能传递top级module中定义的函数
MP_POOL.apply_async(parallel_test, (i, ), callback=async_callback)
# Pool的close调用后,就不能再加入更多的进程了
MP_POOL.close()
# Pool的join阻塞主进程,直到Pool中的进程全部结束
MP_POOL.join()
if __name__ == "__main__":
print("start...")
stt = SimpleParallel()
stt.run()
print("...done")
2、输出结果:
多次运行,输出可能会有不同
start...
----------------in TT.test i: 10----------------
----------------in TT.test i: 15----------------
>>>>>>>>>>>>>callback d: 10<<<<<<<<<<<<<<
----------------in TT.test i: 13----------------
>>>>>>>>>>>>>callback d: 15<<<<<<<<<<<<<<
>>>>>>>>>>>>>callback d: 13<<<<<<<<<<<<<<
----------------in TT.test i: 21----------------
=============terminate pool=============
----------------in TT.test i: 20----------------
...done
3、踩到的坑
- Mac OS X控制台下运行出错:may have been in progress in another thread when fork() was called
修改~/.bash_profile添加环境变量
vi ~/.bash_profileexport OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
source ~/.bash_profile
- Mac OS X用VS Code调试出错:may have been in progress in another thread when fork() was called
修改launch.json添加环境变量:
"env": { "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES" }
- TypeError: can't pickle _thread.lock objects
自定义class的方法需要转换成支持pickle的对象,或者放在top级函数中间接调用(比如例子中tt.test)
4、参考资料
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
https://blog.csdn.net/u010483897/article/details/82222743
https://blog.csdn.net/wen61995120/article/details/80319077
https://blog.csdn.net/u010483897/article/details/82221340
http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
https://blog.csdn.net/dutsoft/article/details/70336462
https://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-multiprocessing-pool-map
网友评论