美文网首页PythonPython学习
Python并发编程——多进程

Python并发编程——多进程

作者: xiaogp | 来源:发表于2021-05-16 17:38 被阅读0次

    摘要:Python多进程

    进程是操作系统中具体的处理任务,米一个进程都会有自己独立的内存空间,它是线程的载体。当一个程序启动时,会默认启动一个进程,将该进程装载到内存,同时在该进程中还会默认启动一个线程,来执行本进程中的内容。


    创建多进程程序

    在Python中进程的相关实现被封装在模块multiprocessing中,进程的创建方法和线程一样有两种

    • 直接通过multiprocessing中的进程类Process创建
    • 继承multiprocessing的Process类,重写run方法

    类似于线程创建,对Process类实例化时直接将进程的处理函数传入即可,通过target来接收处理函数,创建好进程后通过实例化对象的start方法将其启动

    (1)使用类实例化方法创建进程

    定义进程处理函数 run_proc 。在实例化 Process 时 ,将 run_proc 传入 。接着使用实例化对象
    start 方法启动进程,然后调用join代表该子进程结束之后主进程才能结束

    import time
    import os
    from multiprocessing import Process
    
    
    def run_proc(sid):
        print("{} start, pid:{}".format(sid, os.getpid()))
        time.sleep(3)
        print("{} end, pid:{}".format(sid, os.getpid()))
    
    
    if __name__ == '__main__':
        p1 = Process(target=run_proc, args=("a", ))
        p2 = Process(target=run_proc, args=("b", ))
        print("main process: {}".format(os.getpid()))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print("main process end...")
    

    输出如下,课件主进程和子进程的进程号pid都不一样,在调用join之后主进程阻塞直到所有子进程结束

    main process: 12611
    a start, pid:12612
    b start, pid:12613
    a end, pid:12612
    b end, pid:12613
    main process end...
    
    (2)使用继承类的方式创建进程

    实现自定义进程类使其继承于系统进程类 Process 。重写类中的 run 方法,

    import time
    import os
    from multiprocessing import Process
    
    
    def run_proc(sid):
        print("{} start, pid:{}".format(sid, os.getpid()))
        time.sleep(3)
        print("{} end, pid:{}".format(sid, os.getpid()))
    
    
    class MyProcess(Process):
        def __init__(self, sid):
            super().__init__()
            self.sid = sid
    
        def run(self):
            run_proc(self.sid)
    
    
    if __name__ == '__main__':
        p1 = Process(target=run_proc, args=("a",))
        p2 = Process(target=run_proc, args=("b",))
        print("main process: {}".format(os.getpid()))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print("main process end...")
    

    输出如下,和直接实例化Process类输出效果一致

    main process: 14859
    a start, pid:14860
    b start, pid:14861
    a end, pid:14860
    b end, pid:14861
    main process end...
    

    创建进程池

    当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候就要编写自己的线程池/进程池,在Python只有两个模块实现进程池,分别是concurrent.futuresmultiprocessing.Pool

    (1)multiprocessing.Pool

    传递给Pool一个参数设置进程池内的最大进程数,Pool有多个方法,主要是applyapply_asyncmapmap_asyncstarmapstarmap_async,区别如下

    • apply:单次同步执行,每次执行传入一个执行函数的参数,并且执行完毕才能执行下一个进程,如果执行函数有返回值返回最后一个执行完进程的值
    • apply_async:单次启动一个任务,传入一个执行函数的参数,异步执行,启动后不等这个进程结束又开始执行新任务,如果执行函数有返回值返回最后一个执行完进程的对象,需要调用get获得结果值
    • map:执行一个任务列表,传入执行函数的参数一个参数列表,同步执行,即主进程阻塞直到任务列表中的任务全部执行完毕,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致
    • map_async:执行一个任务列表,不需要等待任务列表中的任务执行完毕,主进程可以继续往下执行,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致
    • starmap:和map执行类似,区别是map对执行函数只能传入一个参数,而startmap可以传入多个参数,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致
    • starmap_async:和map_async执行类似,区别是map_async对执行函数只能传入一个参数,而starmap_async可以传入多个参数,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致

    除此之外,在执行完毕多进程任务后,调用close关闭Process对象,释放与之关联的所有资源,调用join阻塞主进程,直到调用join的子进程执行完毕再退出

    下面用代码测试说明

    1. apply
    import time
    import datetime
    import os
    from multiprocessing import Pool
    
    
    def run_proc(sid):
        time.sleep(3)
        print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
        return str(sid)
    
    
    if __name__ == '__main__':
        pool = Pool(4)
        jobs = ["a", "b", "c", "d"]
        for i in jobs:
            pool.apply(run_proc, i)
        pool.close()
        pool.join()
    

    输出如下,可见apply每次只输入一个执行函数的参数,即每次执行一次,多个参数组成的任务列表只能一个接着一个执行

    a end, pid:25260, time:2021-05-16 18:15:08
    b end, pid:25261, time:2021-05-16 18:15:11
    c end, pid:25262, time:2021-05-16 18:15:14
    d end, pid:25263, time:2021-05-16 18:15:17
    

    2.apply_async
    直接修改Pool的apply方法改为apply_assync

    if __name__ == '__main__':
        pool = Pool(4)
        jobs = ["a", "b", "c", "d"]
        for i in jobs:
            pool.apply_async(run_proc, i)
        print("main process")
        pool.close()
        pool.join()
    

    输出如下,可见达到了并行效果,进程之间执行任务互补阻塞,主进程也没有被阻塞,main process被提前打印了出来

    main process
    a end, pid:25721, time:2021-05-16 18:17:33
    b end, pid:25722, time:2021-05-16 18:17:33
    c end, pid:25723, time:2021-05-16 18:17:33
    d end, pid:25724, time:2021-05-16 18:17:33
    

    3.map
    修改Pool的方法为map

    if __name__ == '__main__':
        pool = Pool(4)
        jobs = ["a", "b", "c", "d"]
        pool.map(run_proc, jobs)
        print("main process")
        pool.close()
        pool.join()
    

    输出如下,可见map达到了并行的效果,但是主进程被阻塞,即map执行完任务列表后,主进程才执行

    a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43
    
    b end, pid:26940, time:2021-05-16 18:21:43
    c end, pid:26941, time:2021-05-16 18:21:43
    main process
    

    4.map_async

    if __name__ == '__main__':
        pool = Pool(4)
        jobs = ["a", "b", "c", "d"]
        res = pool.map_async(run_proc, jobs)
        print("main process...")
        pool.close()
        pool.join()
        print(res.get())
    

    输出如下,可见效果了map类似,区别是主进程没有被阻塞,main process被提前打印了出来

    main process...
    c end, pid:21401, time:2021-05-16 21:41:54
    b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54
    
    d end, pid:21402, time:2021-05-16 21:41:54
    ['a', 'b', 'c', 'd']
    

    5.starmap
    将执行函数的入参改为两个,Pool函数改为starmap,并且传入的任务列表的每个参数也改为两个

    def run_proc(sid, other):
        time.sleep(3)
        print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
        return str(sid) + str(other)
    
    
    if __name__ == '__main__':
        pool = Pool(4)
        jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
        pool.starmap(run_proc, jobs)
        print("main process")
        pool.close()
        pool.join()
    

    输入如下,执行函数正确的接收了两个参数,主进程阻塞

    b end, other:r, pid:27642, time:2021-05-16 18:24:52
    a end, other:b, pid:27641, time:2021-05-16 18:24:52
    c end, other:d, pid:27643, time:2021-05-16 18:24:52
    d end, other:a, pid:27644, time:2021-05-16 18:24:52
    main process
    

    看一个map是否支持两个参数

    if __name__ == '__main__':
        pool = Pool(4)
        jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
        pool.map(run_proc, jobs)
        print("main process")
        pool.close()
        pool.join()
    

    直接报错,执行函数缺少参数

    TypeError: run_proc() missing 1 required positional argument: 'other'
    

    6.starmap_async

    if __name__ == '__main__':
        pool = Pool(4)
        jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
        res = pool.starmap_async(run_proc, jobs)
        print("main process")
        pool.close()
        pool.join()
        print(type(res.get()))
    

    输出如下,可见和starmap相比,主进程没有被阻塞,其他一样

    main process
    a end, other:b, pid:17123, time:2021-05-16 21:28:30
    b end, other:r, pid:17124, time:2021-05-16 21:28:30
    d end, other:a, pid:17126, time:2021-05-16 21:28:30
    c end, other:d, pid:17125, time:2021-05-16 21:28:30
    <class 'list'>
    

    多线程和多进程的区别

    多进程和多线程都可以用并行机制来提升系统的运行效率,二者的区别在于运行时所占的内存分布不同

    • 多线程是共用一套内存的代码块区间
    • 多进程是各用一套独立的内存区间

    在大型计算机集群系统中,都会将多进程程序分布运行在不同的计算机上协同工作,而每一台机器上的进程内部,又会由多个线程来并行工作

    相关文章

      网友评论

        本文标题:Python并发编程——多进程

        本文链接:https://www.haomeiwen.com/subject/iqdrjltx.html