美文网首页
Python3用multiprocessing简单实现多进程并行

Python3用multiprocessing简单实现多进程并行

作者: 陈葡萄 | 来源:发表于2018-10-10 17:21 被阅读0次

    1、简单例子代码

    # -*- coding: utf-8 -*-
    
    import multiprocessing
    from multiprocessing import Pool, Queue
    
    # 全局Pool方便在已产生结果数据时中止进程
    global MP_POOL
    
    # class的方法须放在top级函数中间接调用
    def parallel_test(i):
        tt = TT()
        return tt.test(i)
    
    # 供并行进程异步回调的函数
    def async_callback(d):
        global MP_POOL
        print(">>>>>>>>>>>>>callback d: {0}<<<<<<<<<<<<<<".format(d))
        if d == 13:
            print("=============terminate pool=============")
            MP_POOL.terminate()
    
    # 被并行调用的class
    class TT(object):
        def __init__(self):
            pass
        
        def test(self, i):
            print("----------------in TT.test i: {0}----------------".format(i))
            return i
    
    
    class SimpleParallel(object):
        def __init__(self):
            self.myqueue = Queue()
    
        def run(self):
            self.myqueue.put(10)
            self.myqueue.put(15)
            self.myqueue.put(13)
            self.myqueue.put(20)
            self.myqueue.put(21)
            self.myqueue.put(30)
            self.myqueue.put(11)
            self.myqueue.put(19)
            self.myqueue.put(25)
    
            try:
                self.async_track()
            except Exception as e:
                pass
    
        def async_track(self):
            global MP_POOL
            # Pool不指定processes参数,默认也会用cpu_count初始化
            MP_POOL = Pool(processes=multiprocessing.cpu_count())
            while not self.myqueue.empty():
                i = self.myqueue.get()
                # apply_async的func参数只能传递top级module中定义的函数
                MP_POOL.apply_async(parallel_test, (i, ), callback=async_callback)
    
            # Pool的close调用后,就不能再加入更多的进程了
            MP_POOL.close()
            # Pool的join阻塞主进程,直到Pool中的进程全部结束
            MP_POOL.join()
    
    
    if __name__ == "__main__":
        print("start...")
    
        stt = SimpleParallel()
        stt.run()
    
        print("...done")
    
    

    2、输出结果:
    多次运行,输出可能会有不同

    start...
    ----------------in TT.test i: 10----------------
    ----------------in TT.test i: 15----------------
    >>>>>>>>>>>>>callback d: 10<<<<<<<<<<<<<<
    ----------------in TT.test i: 13----------------
    >>>>>>>>>>>>>callback d: 15<<<<<<<<<<<<<<
    >>>>>>>>>>>>>callback d: 13<<<<<<<<<<<<<<
    ----------------in TT.test i: 21----------------
    =============terminate pool=============
    ----------------in TT.test i: 20----------------
    ...done
    

    3、踩到的坑

    • Mac OS X控制台下运行出错:may have been in progress in another thread when fork() was called

    修改~/.bash_profile添加环境变量
    vi ~/.bash_profile

    export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
    

    source ~/.bash_profile

    • Mac OS X用VS Code调试出错:may have been in progress in another thread when fork() was called

    修改launch.json添加环境变量:

    "env": {
        "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES"
    }
    
    • TypeError: can't pickle _thread.lock objects

    自定义class的方法需要转换成支持pickle的对象,或者放在top级函数中间接调用(比如例子中tt.test)

    4、参考资料
    https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
    https://blog.csdn.net/u010483897/article/details/82222743
    https://blog.csdn.net/wen61995120/article/details/80319077
    https://blog.csdn.net/u010483897/article/details/82221340
    http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
    https://blog.csdn.net/dutsoft/article/details/70336462
    https://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-multiprocessing-pool-map

    相关文章

      网友评论

          本文标题:Python3用multiprocessing简单实现多进程并行

          本文链接:https://www.haomeiwen.com/subject/nxfyaftx.html