37.Python编程:多进程multiprocessing

作者: TensorFlow开发者 | 来源:发表于2018-08-10 12:44 被阅读9次

    前言

    前面我们了解了计算机中的CPU多任务原理,知道了进程和线程的概念。今天就通过代码看Python中多进程是如何实现的。

    Linux\Unix多进程:os.fork()

    Linux\Unix操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

    fork()返回结果:子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

    Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

    import os
    pid = os.fork()
    if pid == 0:
        print("这个是子进程,pid是{}, 父进程pid是{}".format(os.getpid(), os.getppid()))
    else:
        print("这个是父进程,pid是{}, 创建了个子进程pid是{}".format(os.getpid(), pid))
    

    提示:os.fork()只在Linux\Unix系统下可以使用。Mac系统是基于Unix系统开发的,所以也支持这种写法。如果你是在Windows系统下做的Python开发,那么运行上面代码,会报错:

    Traceback (most recent call last):
      File "F:/python_projects/multi_process/process_thread.py", line 2, in <module>
        pid = os.fork()
    AttributeError: module 'os' has no attribute 'fork'
    

    由于Windows没有fork调用,但Python是支持跨平台的,所以Windows上用Python编写多进程的程序,就需要借用multiprocessing模块了。

    跨平台多进程:multiprocessing

    multiprocessing模块提供了一个Process类来代表一个进程对象。通常可导入模块:
    from multiprocessing import Process

    示例:

    from multiprocessing import Process
    from time import sleep
    import os
    
    
    # 定义函数
    def my_run(arg):
        print("子进程启动")
        sleep(5)
        print("子进程结束")
    
    
    if __name__ == "__main__":
        print("主进程已经开始启动:{}".format(os.getpid()))
    
        # 创建子进程
        p = Process(target=my_run, args=("子进程",))
    
        # 启动子进程
        p.start()
    
        # join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步
        p.join()
    
        print("父进程结束")
    

    运行结果:

    主进程已经开始启动:26308
    子进程启动
    子进程结束
    父进程结束
    

    提示:上面例子中join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。说白了就是,父进程的结束时间是要等子进程结束后才结束。如果不调用此方法,运行结果如下:

    主进程已经开始启动:960
    父进程结束
    子进程启动
    子进程结束
    

    父进程的结束时间不再受子进程结束的影响。父进程可能在子进程结束之前就结束,也可能在子进程结束后才结束,具体情况要看各自进程的具体操作。本例中,如果不调用p.join()方法,但子进程有耗时操作sleep(5),而父进程中没有耗时的操作,所以父进程优先于子进程结束了。

    说明:
    1,创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
    2.创建子进程时,切不可这样创建:p = Process(target=my_run("子进程"))。为了方便对比,将两种写法拿出来:

    # 上面例子中的创建写法
    p = Process(target=my_run, args=("子进程",))
    
    # 不推荐的创建写法
    p = Process(target=my_run("子进程"))
    

    参数的传入方式不同,上面例子中通过args=传入,形如:args=("子进程",),而错误的写法是直接通过指定target=时就一起传入,形如:target=my_run("子进程")

    错误的创建形式虽然可以创建子进程成功,但会导致:join()方法无论是否调用,都会出现进程同步的现象。即:父进程的结束时间是要等子进程结束后才结束。

    线程池Pool

    如果要启动大量的子进程,可以用进程池的方式批量创建子进程。multiprocessing模块提供了一个Pool类来代表一个进程对象。通常可导入模块:
    from multiprocessing import Pool

    Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

    下面介绍一下multiprocessing模块下的Pool类下的几个方法:

    1.apply()
    函数原型:apply(func[, args=()[, kwds={}]])

    该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

    2.apply_async
    函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])apply()用法一致,但它是非阻塞的且支持结果返回后进行回调。

    3.map()
    函数原型:map(func, iterable[, chunksize=None])

    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。
    注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

    4.map_async()
    函数原型:map_async(func, iterable[, chunksize[, callback]])
    与map用法一致,但是它是非阻塞的。其有关事项见apply_async

    5.close()
    关闭进程池pool,使其不在接受新的任务。

    6.terminal()
    结束工作进程,不在处理未处理的任务。

    7.join()

    主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

    下面我们看一个简单的multiprocessing.Pool类的例子:

    from multiprocessing import Pool
    from time import time, sleep
    import random
    
    
    def perform_task(id):
        begin = time()
        print("【{}号】子进程【开始】执行任务".format(id))
        sleep(random.random() * 5)
        print("【{}号】子进程【结束】执行任务".format(id))
        end = time()
        cost = end - begin
        print("【{}号】子进程执行任务耗时:{}".format(id, cost))
    
    if __name__ == "__main__":
        pool_count = 5
        pool = Pool(pool_count)
        print("进程池准备就绪")
        print("多进程开始执行任务,等待结束...")
        for i in range(pool_count):
            pool.apply_async(perform_task, args=(i,))
        pool.close()
        pool.join()
    
        print("所有进程池中的任务完成")
    

    运行结果:

    进程池准备就绪
    多进程开始执行任务,等待结束...
    【0号】子进程【开始】执行任务
    【1号】子进程【开始】执行任务
    【2号】子进程【开始】执行任务
    【3号】子进程【开始】执行任务
    【4号】子进程【开始】执行任务
    【0号】子进程【结束】执行任务
    【0号】子进程执行任务耗时:0.3483567237854004
    【3号】子进程【结束】执行任务
    【3号】子进程执行任务耗时:1.0760245323181152
    【4号】子进程【结束】执行任务
    【4号】子进程执行任务耗时:1.3666496276855469
    【1号】子进程【结束】执行任务
    【1号】子进程执行任务耗时:3.833340644836426
    【2号】子进程【结束】执行任务
    【2号】子进程执行任务耗时:4.259188890457153
    所有进程池中的任务完成
    

    对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

    进程间通信

    Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    from multiprocessing import Process, Queue
    import os, time, random
    
    
    # 写数据进程执行的代码:
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A', 'B', 'C']:
            print('Put %s to queue...' % value)
            q.put(value)
            time.sleep(random.random())
    
    # 读数据进程执行的代码:
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.get(True)
            print('Get %s from queue.' % value)
    
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程:
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
        # 启动子进程pw,写入:
        pw.start()
        # 启动子进程pr,读取:
        pr.start()
        # 等待pw结束:
        pw.join()
        # pr进程里是死循环,无法等待其结束,只能强行终止:
        pr.terminate()
    

    运行结果:

    Process to write: 23192
    Put A to queue...
    Process to read: 2824
    Get A from queue.
    Put B to queue...
    Get B from queue.
    Put C to queue...
    Get C from queue.
    

    小结

    本文主要学习了python中的多进程Process及实现跨平台的多进程multiprocessing,进程池Pool、通过Queue进程间通信。


    更多了解,可关注公众号:人人懂编程


    微信公众号:人人懂编程

    相关文章

      网友评论

        本文标题:37.Python编程:多进程multiprocessing

        本文链接:https://www.haomeiwen.com/subject/dkqqmftx.html