美文网首页
python 子进程返回值给父进程

python 子进程返回值给父进程

作者: 汉江岳 | 来源:发表于2021-02-19 21:51 被阅读0次

    需求

    串行:preprocessing --> fuc_1 --> fuc_2 --> fuc_3 --> postprocessing
    并行:preprocessing --> (fuc_1 fuc_2 fuc_3并行执行) --> postprocessing
    实现10最有效率的

    实现1

    可以满足需求,但效率不高,需要不断地 建立子进程、销毁子进程

    import multiprocessing
    from multiprocessing import Manager
    def worker(procnum, return_dict):
        '''worker function'''
        print(str(procnum) + ' represent!')
        return_dict[procnum] = procnum
    
    
    if __name__ == '__main__':
        manager = Manager()
        # return_list = manager.list() 也可以使用列表list
        return_dict = manager.dict()
        jobs = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,return_dict))
            jobs.append(p)
            p.start()
    
        for proc in jobs:
            proc.join()
        print(return_dict.values())
    

    实现2

    使用进程的队列multiprocessing.Queue,put(),get()方法
    子进程不需返回值,将目标结果放入队列中
    在主进程中获取 get方法

    import random
    import time
    import multiprocessing
    
    
    def worker_1(k,q):
        t = 0
        print("process-", k)
        for i in range(10):
            x = random.randint(1, 3)
            t += x
        q.put(t)
    
    def worker_2(k,q):
        t = 0
        print("process-", k)
        for i in range(5):
            x = random.randint(1, 3)
            t += x
        q.put(t)
    
    if __name__ == '__main__': 
        q = multiprocessing.Queue()
        jobs = []
        
        p = multiprocessing.Process(target=worker_1, args=('1', q))
        jobs.append(p)
        p.start()
    
        p = multiprocessing.Process(target=worker_2, args=('2', q))
        jobs.append(p)
        p.start()
        
        for p in jobs:
            p.join()
        
        results = [q.get() for j in jobs]
        print(results)
    

    实现3

    使用进程池

    from multiprocessing import Pool
    import time
     
    def func_1(i):
        time.sleep(1)
        return  i*i
    
     
    def func_2(i):
        time.sleep(2)
        return  i*i*i
     
    if __name__ == '__main__':
        p1 = Pool(5)
        p2 = Pool(5)
        ret_1 = p1.map(func_1,range(10)) # pool.map() 默认是同步的
        ret_2 = p2.map(func_2,range(10))
        print(ret_1)
        print(ret_2)
    

    实现4

    使用pool.apply_async() 异步执行
    pool.map_async() 异步执行+列表推导

    from multiprocessing import Pool
    import time
    
    def func_1(*args):
        time.sleep(2)
        print(args)
        return args[0][0], args[0][1], args[0][2]
     
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
     
    # 先建立进程池,放在
    p1 = Pool(5)
    p2 = Pool(5)
    
    if __name__ == '__main__':
        
        start = time.time()
        # ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)]) 
        res = p1.apply_async(func_1, ('lim', 2020, 65.8))
        # print('jhsjsj')
        ret_2 = p2.map_async(func_2, range(5))
        ret_2 = p2.map(func_2, range(5))
        print(res.get())
        # print(ret_1.get())
        print(ret_2.get())
        print(time.time() - start)
        # print(ret_1 + ret_2)
    

    实现5

    • 用一个进程池来并行执行不同的函数,一个进程池的开销小一些
    • 如果用多个进程池,则进程池之间需要采用异步执行
    from multiprocessing import Pool
    import time
    
    def func_1(*args):
        time.sleep(2)
        print(args)
        return args[0][0], args[0][1], args[0][2]
     
    
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    def func3():
        print(3)
    
    def funMain(i):
        if i == 0:
            func_1('lim', 2020, 65.8)
        else:
            func3() 
     
    p1 = Pool(2)
    p2 = Pool(5)
    
    if __name__ == '__main__':
        
        start = time.time()
        # ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)]) 
        # 如果用多个进程池,则进程池之间需要采用异步执行
        # res = p1.apply_async(func_1, ('lim', 2020, 65.8))
        # print('jhsjsj')
        # ret_2 = p2.map_async(func_2, range(5))
        ret_2 = p2.map(funMain, range(2))  # 用一个进程池来并行执行不同的函数,一个进程池的开销小一些
        # print(res.get())
        # print(ret_1.get())
        print(ret_2)
        print(time.time() - start)
        # print(ret_1 + ret_2)
    

    实现6

    用一个进程池多进程执行多个函数并收集结果

    from multiprocessing import Pool # 进程池
    import time # 计时
    
    
    def func_1(*args): # *list  读取参数列表
        time.sleep(2)
        print(args)
        return args[0], args[1], args[2] # 三个参数
    
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    def func3():
        print(5)
        time.sleep(5)
        return 5
    
     
    p1 = Pool(2)
    
    
    if __name__ == '__main__':
        start = time.time()
        res1 = p1.apply_async(func_1, ('lim', 2020, 65.8))
        res2 = p1.apply_async(func_2, [2])
        res3 = p1.apply_async(func3,)
        print(res1.get())
        print(res2.get())
        print(res3.get())
        print(time.time() - start)
    

    实现7

    线程池嵌套进程池

    #coding=utf-8
    # 线程池 嵌套进程池
    
    from multiprocessing import Pool # 进程池
    import time # 计时 
    from concurrent.futures import ThreadPoolExecutor
    
    
    def func_1(*args): # *list  读取参数列表
        time.sleep(2)
        print(args)
        return args[0], args[1], args[2] # 三个参数
    
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    def func3():
        print(5)
        time.sleep(5)
        return 5
    
    def f(txt):
        start = time.time()
        pool = Pool(3)
        res1 = pool.apply_async(func_1, ('lim', 2020, 65.8))
        res2 = pool.apply_async(func_2, [2])
        res3 = pool.apply_async(func3,)
        print(res1.get())
        print(res2.get())
        print(res3.get())
        print(time.time() - start)
        return txt
    
    
    
    if __name__ == '__main__':
        start = time.time()
        
        executor = ThreadPoolExecutor(max_workers=2)
        # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
        task1 = executor.submit(f, ('hello'))
        task2 = executor.submit(f, ('world'))
        # done方法用于判定某个任务是否完成
        # print(task1.done())
        # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
        # print(task2.cancel())
        # time.sleep(4)
        # print(task1.done())
        # result方法可以获取task的执行结果
        print(task1.result()) # 对线程返回值的收集
        print(task2.result())
        print(time.time() - start)
    

    实现8

    线程池嵌套线程池

    #coding=utf-8
    # 线程池 嵌套 线程池  multithread-multithread
    # 多线程可能是伪的并行,在计算复杂度高的情况下,可能达不到加速的效果
    
    from multiprocessing import Pool # 进程池
    import time # 计时 
    from concurrent.futures import ThreadPoolExecutor
    
    
    def func_1(*args): # *list  读取参数列表
        time.sleep(2)
        print(args)
        args = args[0]
        return args[0], args[1], args[2] # 三个参数
    
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    def func3():
        print(5)
        time.sleep(5)
        return 5
    
    def f(txt):
        start = time.time()
        executor = ThreadPoolExecutor(max_workers=3)
        # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
        task1 = executor.submit(func_1, ('lim', 2020, 65.8))
        task2 = executor.submit(func_2, (2))
        task3 = executor.submit(func3,)
        print(task1.result()) # 对线程返回值的收集
        print(task2.result())
        print(task3.result())
        print(time.time() - start)
        return txt
    
    
    
    if __name__ == '__main__':
        start = time.time()
        
        executor = ThreadPoolExecutor(max_workers=2)
        # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
        task1 = executor.submit(f, ('hello'))
        task2 = executor.submit(f, ('world'))
        # done方法用于判定某个任务是否完成
        # print(task1.done())
        # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
        # print(task2.cancel())
        # time.sleep(4)
        # print(task1.done())
        # result方法可以获取task的执行结果
        print(task1.result()) # 对线程返回值的收集
        print(task2.result())
        print(time.time() - start)
    

    实现9

    进程池嵌套线程池
    可以实现简单的子函数用多线程计算,复杂的函数用多进程计算

    #coding=utf-8
    #进程池嵌套线程池
    # 进程池
    
    from multiprocessing import Pool # 进程池
    import time # 计时 
    from concurrent.futures import ThreadPoolExecutor
    
    
    def func_1(*args): # *list  读取参数列表
        time.sleep(2)
        print(args)
        args = args[0]
        return args[0], args[1], args[2] # 三个参数
    
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    def func3():
        print(5)
        time.sleep(5)
        return 5
    
    def f(txt):
        start = time.time()
        executor = ThreadPoolExecutor(max_workers=3)
        # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
        task1 = executor.submit(func_1, ('lim', 2020, 65.8))
        task2 = executor.submit(func_2, (2))
        task3 = executor.submit(func3,)
        print(task1.result()) # 对线程返回值的收集
        print(task2.result())
        print(task3.result())
        print(time.time() - start)
        return txt
    
    
    
    if __name__ == '__main__':
        start = time.time()
        pool = Pool(2)
        res1 = pool.apply_async(f, ('hello',))
        res2 = pool.apply_async(f, ('world',))
        print(res1.get())
        print(res2.get())
        print(time.time() - start)
    

    实现10

    开大进程池的进程个数避免使用多线程,因为python的多线程是伪的多线程,计算密集时起不到加速作用,还是用多进程比较靠谱。

    #coding=utf-8
    # 只用进程池,增加进程数量
    from multiprocessing import Pool # 进程池
    import time # 计时 
    
    # 子函数1
    def func_1(*args): # *list  读取参数列表
        time.sleep(2)
        # print(args)
        # args = args[0]
        return args[0], args[1], args[2] # 三个参数
    
    # 子函数2
    def func_2(i):
        time.sleep(3)
        return  {'ans': i*i*i}
    
    # 子函数3
    def func_3():
        print(5)
        time.sleep(5)
        return 5
    
    # 定义汇总处理逻辑
    def f(a,b,c): 
        return a, b, c
    
    
    if __name__ == '__main__':
        start = time.time()
        pool = Pool(6) # 根据需要创建进程池
    
        # 不同子函数的输入列表
        input1 = [('lim', 2020, 65.8), ('liuts', 2020, 65.8)]
        input2 = [(2,), (3,)]
        input3 = [(), ()]
    
        # 异步执行多个子函数来并行处理输入列表
        res1 = [pool.apply_async(func_1, e) for e in input1]
        res2 = [pool.apply_async(func_2, e) for e in input2]
        res3 = [pool.apply_async(func_3, e) for e in input3]
    
        # 结果收集
        res1 = [e.get() for e in res1]
        res2 = [e.get() for e in res2]
        res3 = [e.get() for e in res3]
    
        # # 串行汇总处理
        # for a,b,c in zip(res1, res2, res3):
        #     print(f(a, b, c)) # 单条数据
    
        # 并行汇总处理
        final_res = [pool.apply_async(f, (a,b,c)) for a,b,c in zip(res1, res2, res3)]
        final_res = [e.get() for e in final_res]
        print(final_res)
        
        print(time.time() - start)
    

    参考

    1. https://docs.python.org/3/library/multiprocessing.html
    2. https://blog.csdn.net/huangpo001/article/details/106254480
    3. ThreadPoolExcutor https://www.jianshu.com/p/b9b3d66aa0be

    相关文章

      网友评论

          本文标题:python 子进程返回值给父进程

          本文链接:https://www.haomeiwen.com/subject/jcaaxltx.html