美文网首页
Python-进程

Python-进程

作者: 村东头老骥 | 来源:发表于2019-07-24 00:34 被阅读0次

    Python-多进程

    1 创建一个进程

    1.1 用 Process 创建一个进程

    from multiprocessing import Process
    import time
    import os
    
    def foo(name):
        print('foo--start', os.getpid())
        time.sleep(5)
        print('%s--父进程' % name, os.getppid())  # 传入参数
        print('foo--end', os.getpid())
    
    
    if __name__ == '__main__':
        print('main--start', os.getpid())
        p = Process(target=foo, args=('task-foo',))  # 注册进行,并且传入参数
        p.start()  # 开启了一个进程
        # print('main--父进程号', os.getppid())  # 查看当前的进程的父进程
        print('main--end', os.getpid())  # 开启当前进程的进程号
    
    输出结果:
    # 子进程的父进程
    main--start 19140
    main--end 19140
    # 子进程(pid相同)
    foo--start 19141
    task-foo--父进程 19140
    foo--end 19141
    

    1.2 进程 Process 介绍

    class Process(object):
        def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
            self.name = ''
            self.daemon = False
            self.authkey = None
            self.exitcode = None
            self.ident = 0
            self.pid = 0
            self.sentinel = None
    
        def run(self):
            pass
    
        def start(self):
            pass
    
        def terminate(self):
            pass
    
        def join(self, timeout=None):
            pass
    
        def is_alive(self):
            return False
    
    
    

    1.3 进程的生命周期

    进程的生命周期主要是注意两种情况:

    • 主进程需要大量的执行时间
    • 子进程执行过程中需要大量的执行时间

    主进程(执行时间) > 子进程的(执行时间)

    def foo(name):
        print('foo--start', os.getpid())
        time.sleep(3)
        print('%s--父进程' % name, os.getppid())  # 传入参数
        print('foo--end', os.getpid())
    
    
    if __name__ == '__main__':
        print('main--start', os.getpid())
        p = Process(target=foo, args=('task-foo',))  # 注册进行,并且传入参数
        p.start()  # 开启了一个进程
        time.sleep(5)
        print('main--end', os.getpid())  # 开启当前进程的进程号
        # print('main--父进程', os.getppid())  # 查看当前的进程的父进程
    
    输出:
    main--start 19526  # 主进程执行时间过长
    foo--start 19527
    task-foo--父进程 19526
    foo--end 19527
    main--end 19526
    
    
    

    子进程(执行时间) > 主进程的(执行时间)

    from multiprocessing import Process
    import time
    import os
    
    def foo(name):
        print('子进程foo--start')
        print(name)
        time.sleep(3)
        print('子进程foo--end')
    
    
    if __name__ == '__main__':
        print('主进程开始')
        p = Process(target=foo, args=('foo',))
        p.start()
        print('主进程结束')
    
    输出结果
    # ---- 主进程 ----
    主进程开始
    主进程结束      
    # ---- 子进程 ----
    子进程foo--start
    foo
    子进程foo--end
    

    1.4 join 方法

    • 参数 timeout 默认值 None

    如果当可选参数 timeout 为默认值 None的时候,则改方法将堵塞,直到调用其方法的主进程join终止才终止.当传入参数的时候,超时则停止阻塞,顺序执行.

    from multiprocessing import Process
    import time
    import os
    
    
    def func(x, y):
        print('func--start')
        print(x, y)
        time.sleep(15)
        print('func--end')
    
    
    if __name__ == '__main__':
        print('main-start')
        p = Process(target=func, args=(1, 2))
        print('开启子进程')
        p.start()
        print('main--等待子进程返回结果')
        p.join()  # 阻塞主进程,等待一个子进程的结束而顺序执行,将异步的程序改为同步,
        print('子进程结束')
        print('main-end')
    
    # 输出结果
    main-start
    开启子进程
    main--等待子进程--foo返回结果
    func--start
    1 2
    func--end
    子进程结束
    main-end
    # 总结
    如果在主进程中没有join()的时候,不会等待子进程的结束而继续执行,但是当我们在主进程中使用了join(),则主进程会等待子进程的结束而继续向下执行.
    

    注意:

    join(timeout) 如果传入参数timeout,那么当延时超过则取消主程序的阻塞,继续向下执行.

    2 开启多进程

    2.1 创建多个进程

    import os
    import time
    from multiprocessing import Process
    
    
    def foo(x, y):
        print('foo-start--%s' % os.getpid())
        print(x, y)
        time.sleep(3)
        print('foo-end--%s' % os.getpid())
    
    
    if __name__ == '__main__':
        print('主进程开始--%s' % os.getpid())
        p = Process(target=foo, args=(1, 3))     # 注册第一个进程
        p.start()                                # 启动第一个进程
        p1 = Process(target=foo, args=(10, 10))  # 注册第一个进程
        p1.start()                               # 启动第一个进程
        print('主进程结束--%s' % os.getpid())
    
    # 输出结果
    主进程开始--20307
    主进程结束--20307
    foo-start--20308    # 第一个进程启动
    1 3
    foo-start--20309    # 第二个进程启动
    10 10
    foo-end--20309
    foo-end--20308
    

    2.2 多进程使用 join() 的方法

    def foo(x, y):
        print('foo-start--%s' % os.getpid())
        print(x, y)
        time.sleep(3)
        print('foo-end--%s' % os.getpid())
    
    
    if __name__ == '__main__':
        print('主进程开始--%s' % os.getpid())
        p = Process(target=foo, args=(1, 3))     # 注册第一个进程
        p.start()                                # 启动第一个进程
        p.join()                                        # 阻塞当前进程,等待子进程 p 执行结束继续向下执行
        p1 = Process(target=foo, args=(10, 10))  # 注册第一个进程
        p1.start()                               # 启动第一个进程
        p1.join()                                       # 同上
        print('主进程结束--%s' % os.getpid())
    
    # 输出结果
    主进程开始--20325
    foo-start--20326   # 子进程 p 开始
    1 3
    foo-end--20326     # 子进程 p 结束
    foo-start--20327   # 子进程 p1 开始
    10 10
    foo-end--20327     # 子进程 p1 结束
    主进程结束--20325
    
    # 总结
    当多进程使用了 join 无疑就是将本来异步的程序变成同步,
    但是他们是同时执行在同一时间段上实现了同时运行。
    区别于分别调用foo执行速度还是比较快的。
    

    2.3 关于多进程 join 的位置

    2.3.1 位于循环体内

    def foo(x, y):
        print(str(os.getpid()) + '--start')
        time.sleep(3)
        print(str(os.getpid()) + '--end')
    
    
    if __name__ == '__main__':
        print('主进程开始')
        for i in range(5):
            p = Process(target=foo, args=(5 * i, 5 * i))
            p.start()
            p.join()  # 等待 第一个子进程 执行回来执行第二个
        print('主进程结束')
    # 输出结果
    20387--start
    20387--end
    20388--start
    20388--end
    20389--start
    20389--end
    20390--start
    20390--end
    20391--start
    20391--end
    主进程结束
    # 简易的时序图
    
    主进程开始
        17750--start
            17750--end
                17751--start
                    17751--end
                        17752--start
                            17752--end
                                17753--start
                                    17753--end
                                        17754--start
                                            17754--end
                                                主进程结束
    --------------------------------------------------------------> t
    # 总结
        当我们把join位于循环体内的时候,实际上相当于循环体中调用函数,千万不要这样做。
    

    2.3.2 位于循环体外

    def foo(x, y):
        # print(str(os.getpid()) + '--start')
        print('*'*x)
        time.sleep(3)
        print('*' * y)
        # print(str(os.getpid()) + '--end')
    
    
    if __name__ == '__main__':
        print('主进程开始')
        for i in range(5):
            p = Process(target=foo, args=(5 * i, 5 * i))
            p.start()
        p.join()  # 此时表示最后一个进程结束后执行 print('主进程结束'),所以主进程结束会有不确定性。
        print('主进程结束')
    
    # 输出结果
    主进程开始
    ********************
    
    *****
    **********
    ***************
    
    *****
    ******************** # 最后的一个进程
    主进程结束
    **********
    ***************
    # 总结
        - 当位于循环体外的时候,实际上是等待最后的一个进程结束而向下执行,简单点说它只是阻塞最后的一个进程。当最后一个进程执行完毕依次顺序执行。当出现这种情况的时候不确定性是非常大的。
    
    

    2.3.3 循环遍历 join

    def foo(x, y):
        print(str(os.getpid()) + '--start')
        time.sleep(3)
        print(x, y)
        print(str(os.getpid()) + '--end')
    
    
    if __name__ == '__main__':
        print('主进程开始')
        p_list = []
        for i in range(5):
            p = Process(target=foo, args=(5 * i, 5 * i))
            p_list.append(p)
            p.start()
        [p.join() for p in p_list]
        print('主进程依赖所有子进程返回结果才能执行以下任务')
        print('主进程结束')
    
    # 输出结果
    主进程开始
    20416--start
    20417--start
    20418--start
    20419--start
    20415--start
    15 15
    20418--end
    10 10
    20417--end
    5 5
    20416--end
    20 20
    20419--end
    0 0
    20415--end
    主进程依赖所有子进程返回结果才能执行以下任务
    主进程结束
    # 简易图
    主进程开始
    20662--start
    20659--start
    20660--start
    20661--start
    20663--start
        20659--end
            20662--end
                20660--end
                    20661--end
                        20663--end
                            主进程结束
    ---------------------------------------------------> t
    
    

    2.3.4 使用类创建进程

    参考源码

    class Process(object):
        def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
            self.name = ''
            self.daemon = False
            self.authkey = None
            self.exitcode = None
            self.ident = 0
            self.pid = 0
            self.sentinel = None
    

    通过继承 Process 类的方式开启多进程

    import time
    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
    
        def run(self):
            print(self.pid)
            print(self.name)
    
    
    if __name__ == '__main__':
        p = MyProcess()
        p.start()
        p1 = MyProcess()
        p1.start()
    
    

    那么我们应该如何传入参数那?

    class MyProcess(Process):
    
        def __init__(self,arg):
            super().__init__()  
            self.arg = arg # 传入参数
    
        def run(self):
            print(self.pid)
            print(self.name)
    
    
    if __name__ == '__main__':
        p = MyProcess(1)
        p.start()
        p1 = MyProcess(2)
        p1.start()
    # 注意
        -当我们传入参数的时候应该继承Process的__init__()
    

    2.3.5 进程之间数据隔离

    import os
    from multiprocessing import Process
    
    def foo():
        print('子进程-foo-开始')
        global n
        n = 0
        print('pid:%s' % os.getpid(), n)
        print('子进程-foo-结束')
    
    
    if __name__ == '__main__':
        print('主进程-start')
        n = 100
        p = Process(target=foo)
        p.start()
        print(os.getpid(), n)
        print('主进程-end')
    

    3 什么是守护进程

    概念:随着主进程的结束而结束

    3.1 没有开启守护进程

    import os
    import time
    from multiprocessing import Process
    
    def foo():
        print('子进程-start')
        while True:
            time.sleep(0.5)
            flag = p.is_alive()
            print('子进程-是否正在执行', flag)
        print('子进程-end')
    
    
    if __name__ == '__main__':
        print('主程序-开始')
        p = Process(target=foo)
        p.start()
        n = 0
        while n < 5:
            print('我是socket server %s' % n)
            time.sleep(1)
            n += 1
        print('主程序-结束')
    # 输出结果
    主程序-开始
        我是socket server 0
        子进程-start
        子进程-是否正在执行 True
        我是socket server 1
        子进程-是否正在执行 True
        子进程-是否正在执行 True
        我是socket server 2
        子进程-是否正在执行 True
        子进程-是否正在执行 True
        我是socket server 3
        子进程-是否正在执行 True
        子进程-是否正在执行 True
        我是socket server 4
        子进程-是否正在执行 True
        子进程-是否正在执行 True
    主程序-结束
    子进程-是否正在执行 True
    子进程-是否正在执行 True
    子进程-是否正在执行 True
    ......                           # 由于循环的原因 子进程会继续执行
    
    说明:
        - 在上面的例子中主进程结束后,那么子进程不会跟随子进程继续执行。
    

    3.2 开启守护进程 daemon

    主进程创建守护进程

    • 守护进程会在主进程代码执行结束后就终止
    • 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    def foo():
        print('子进程-start')
        while True:
            time.sleep(0.5)
            flag = p.is_alive()
            print('子进程-是否正在执行', flag)
        print('子进程-end')
    
    
    if __name__ == '__main__':
        print('主程序-开始')
        p = Process(target=foo)
        p.daemon = True  # 设置 p 为守护进程
        p.start()
        n = 0
        while n < 5:
            print('我是socket server %s' % n)
            time.sleep(1)
            n += 1
        print('主程序-结束')
    
    输出结果
    主程序-开始
    我是socket server 0
        子进程-start
        子进程-是否正在执行 True
    我是socket server 1
        子进程-是否正在执行 True
        子进程-是否正在执行 True
    我是socket server 2
        子进程-是否正在执行 True
        子进程-是否正在执行 True
    我是socket server 3
        子进程-是否正在执行 True
        子进程-是否正在执行 True
    我是socket server 4
        子进程-是否正在执行 True
        子进程-是否正在执行 True  # 子进程到最后直接跳出循环,随着主进程的结束而结束
    主程序-结束
    
    

    【总结】进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    4 进程锁

    4.1 锁 Lock

    如果在进程中没有锁会引发的问题

    from multiprocessing import Process, Lock
    import os
    import time
    
    db = {
        'ticket': 1
    }
    
    
    def check_ticket():
        return print('余票:%s' % db['ticket'])
    
    
    def get_ticket(i):
        if db['ticket'] > 0:
            db['ticket'] -= 1
            print('\033[32m%s买到票了\033[0m' % i)
        else:
            print('\033[31m%s没买到票\033[0m' % i)
    
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=check_ticket)
            p.start()
        time.sleep(1)
        for i in range(10):
            p = Process(target=get_ticket, args=(i,))
            p.start()
    # 输出结果
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    余票:1
    5买到票了
    6买到票了
    1买到票了
    2买到票了
    3买到票了
    7买到票了
    8买到票了
    9买到票了
    0买到票了
    4买到票了
    # 总结
    根据打印的输出结果我们可以看出实际上只有一张票,但是却都买到了票,针对这种情况就是对不同的任务抢占同一个资源的场景.
    

    针对上述的例子我们就要用到了锁的概念

    剩余票的数量 ticket.txt

    {"ticket": 1}
    

    对多个进程修改同一个资源进行安全处理 加锁

    
    def check_ticket(i):
        with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt') as f:
            dic = json.load(f)
        print('余票: %s' % dic['ticket'])
    
        # return print(i,'查询余票数量:%s' % db['ticket'])
    
    
    def get_ticket(i, lock):
        lock.acquire()  # 给资源加锁
        with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt') as f:
            dic = json.load(f)
            time.sleep(0.1)
        if dic['ticket'] > 0:
            dic['ticket'] -= 1
            print('\033[32m%s买到票了\033[0m' % i)
        else:
            print('\033[31m%s没买到票\033[0m' % i)
        time.sleep(0.1)
        with open('/opt/base_python/library_python/_multilprocess/Lock/ticket.txt', 'w') as f:
            json.dump(dic, f)
    
        lock.release()  # 释放资源锁
        
    总结:
        - 注意锁的位置。
    

    4.2 信号量 Semaphore

    概念:Lock 属于互斥锁,也就是一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。

    生活中常见的例子

    '''
    在该例子中银行共计5个窗口,而实际办理业务的人员共计有20个。所以区别于锁的概念来说信号量就是基于锁的每次只能操作一个资源的扩展变成设置同时操作多个资源。
    
    '''
    
    
    from multiprocessing import Process
    from multiprocessing import Semaphore
    import os, random, time
    
    
    def bank(i, sem):  # 银行业务窗口
        sem.acquire()  # 被叫号 办理业务
        print('%s正在办理业务--start' % i)
        time.sleep(random.randint(1, 5))  # 每一次办理业务的时候 时间不定
        print('%s结束业务--end' % i)
        sem.release()
    
    
    if __name__ == '__main__':
        sem = Semaphore(4)  # 表示共有4个柜台的办理业务
        for i in range(20):  # 表示共计有20个业务员进行等待办理业务。
            p = Process(target=bank, args=(i, sem))
            p.start()
    
    '''
    1正在办理业务--start
    0正在办理业务--start
    2正在办理业务--start
    3正在办理业务--start   # 可以同时保证开启四个进程 执行任务 与 进程锁区别的是因为 进程锁只有一个 但是他可以执行多个
    0结束业务--end         
    4正在办理业务--start
    2结束业务--end
    5正在办理业务--start
    1结束业务--end
    14正在办理业务--start
    4结束业务--end
    15正在办理业务--start
    3结束业务--end
    13正在办理业务--start
    5结束业务--end
    10正在办理业务--start
    14结束业务--end
    6正在办理业务--start
    13结束业务--end
    7正在办理业务--start
    6结束业务--end
    8正在办理业务--start
    15结束业务--end
    9正在办理业务--start
    7结束业务--end
    12正在办理业务--start
    10结束业务--end
    16正在办理业务--start
    8结束业务--end
    11正在办理业务--start
    11结束业务--end
    17正在办理业务--start
    12结束业务--end
    9结束业务--end
    18正在办理业务--start
    19正在办理业务--start
    16结束业务--end
    17结束业务--end
    18结束业务--end
    19结束业务--end
    
    '''
    

    4.3 事件 Event

    常用的属性和方法的介绍

    from multiprocessing import Event
    e = Event()  # 创建事件
    e.is_set()   # 查看是否为阻塞状态 默认阻塞 False
    e.set()      # 设置 e.is_set() True 
    # 注意 e.set() 只能设置 e.is_set() 为True
    e.clear()    # 设置 e.is_set() False
    '''
    set() 和 clear() 是相对立的
    set -- True
    clear -- False
    # 示例代码
    In [3]: e.is_set()                                                                               
    Out[3]: False
    
    In [4]: e.set()                                                                                                             
    In [5]: e.is_set()                                                                                                     
    Out[5]: True
    
    In [6]: e.clear()                                                                                                           
    In [7]: e.is_set()                                                                                                      
    Out[7]: False
    '''
    e.wait()     
    # 根据 e.is_set() 决定是否执行阻塞
    #    False -- 阻塞 
    #    True 不执行阻塞
    
    
    • is_set()--False && e.wait()
    e = Event()
    print('start....')
    print(e.is_set())
    e.is_set() # False
    e.wait()   # 执行阻塞
    print('end...')
    # 输出结果
    # start....
    # False
    
    • is_set()--False && e.wait()
    e = Event()
    print('start....')
    print(e.is_set())
    e.is_set()         # False
    e.set()            # 设置为True
    print(e.is_set())  # True
    e.wait()           # 执行阻塞
    print('end...')
    # 输出结果
    # start....
    # False
    # end...
    

    【总结】:对于这个地方我总是混淆,所以写的比较详细一些。

        (初始)False ---->  阻塞 -- e.set() -- 非阻塞
            /     \      /     
    is_set()       wati()     --->
            \     /      \ 
             True  ---->  非阻塞 -- e.clear -- 阻塞
    注意区分
        is_set() 是下达命令 
        wait     是执行命令
    

    红绿灯事件

    # 红绿灯事件
    import time
    import random
    from multiprocessing import Event, Process
    
    
    def cars(e, i):
        if not e.is_set():  # if not false(阻塞状态)
            print('car%i在等待' % i)
            e.wait()  # 阻塞 当 is_set() 为 false 执行阻塞
        print('\033[0;32;40mcar%i通过\033[0m' % i)
    
    
    def light(e):
        while True:
            if e.is_set():
                e.clear()  # 将其设置为 False
                print('\033[31m红灯亮了\033[0m')
            else:
                e.set()
                print('\033[32m绿灯亮了\033[0m')
            time.sleep(2)
    
    
    if __name__ == '__main__':
        e = Event()
        traffic = Process(target=light, args=(e,))
        traffic.start()
        for i in range(20):
            car = Process(target=cars, args=(e, i))
            car.start()
            time.sleep(random.random())
    # 输出结果
    car0在等待
    绿灯亮了
    car0通过
    car1通过
    car2通过
    car3通过
    car4通过
    红灯亮了
    car5在等待
    car6在等待
    car7在等待
    绿灯亮了
    car5通过
    car6通过
    car7通过
    car8通过
    car9通过
    car10通过
    car11通过
    红灯亮了
    car12在等待
    
    
    

    5 进程之间的通信-队列

    5.1 队列的常用方法介绍

    In [2]: from multiprocessing import Queue                   
    In [3]: q = Queue(3)                                        
    In [4]: dir(q)     
     ...
     'cancel_join_thread',
     'close',        # 关闭后则不能操作 
     'empty',
     'full',
     'get',          # 注意参数 block timeout 
     'get_nowait',   # 如果队列中空则会抛出异常
     'join_thread',
     'put',          # block 默认为True 如果为False 抛异常
        # In [19]: q.full()                                       
        # Out[19]: True
        # In [20]:                                          
        # In [20]: q.put(1,block=False)  
        
     'put_nowait',   # 如果队列满了则会跑出异常
     'qsize'         # 查看队列的数量
     ....
    
    # 压入数据 put 验证是否满了 full
    In [9]: q = Queue(3)                                     
    In [10]: q.put(1)                                           
    In [11]: q.put(2)                                             
    In [12]: q.put(3)                                           
    In [13]: q.full()                                      
    Out[13]: True
    In [14]: q.put(4)  # 阻塞状态 直到队列中有值被取出
    
    
    # 取出数据 get 验证是否空了 empty
    In [6]: q.put(1)                                              
    In [7]: q.get()                  
    Out[7]: 1
    In [8]: q.empty()                                           
    Out[8]: True
    In [9]: q.get()     # 阻塞状态 直到有值压入
    

    5.2 进程中使用 Queue 数据传输

    5.2.1 主进程-子进程之间数据通信

    from multiprocessing import Queue, Process
    import time
    
    def _put(q):
        q.put('hello word')
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=_put, args=(q,))
        p.start()
        time.sleep(1)
        print(q.get())
    # 输出结果
    子进程-put
    主进程-get
    hello word
    

    5.2.1 子进程-子进程之间数据通信

    def _put(q):
        print('子进程-put')
        q.put('hello word')
    
    def _get(q):
        print('子进程-get')
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        p_put = Process(target=_put, args=(q,))
        p_get = Process(target=_get, args=(q,))
        p_put.start()
        p_get.start()
    # 输出结果
    子进程-get
    子进程-put
    hello word
    

    5.3 队列之生成者消费者

    5.3.1 生产者 Queue

    def _put(q, worker):
        for i in range(1, 11):
            time.sleep(1)
            ball = '%s生产的第%s个球' % (worker, i)
            q.put(ball)
    
    
    def _get(q):
        while True:
            ball = q.get()
            print('运动员购买了%s' % (ball))
    
    
    if __name__ == '__main__':
        q = Queue(20)
        p_put_1 = Process(target=_put, args=(q, 'Bob'))
        p_put_2 = Process(target=_put, args=(q, 'Kevin'))
        p_get = Process(target=_get, args=(q,))
        p_put_1.start()
        p_put_2.start()
        p_get.start()
    
    # 输出结果
    运动员购买了Bob生产的第1个球
    运动员购买了Kevin生产的第1个球
    运动员购买了Bob生产的第2个球
    运动员购买了Kevin生产的第2个球
    运动员购买了Bob生产的第3个球
    运动员购买了Kevin生产的第3个球
    运动员购买了Bob生产的第4个球
    运动员购买了Kevin生产的第4个球
    运动员购买了Bob生产的第5个球
    运动员购买了Kevin生产的第5个球
    运动员购买了Bob生产的第6个球
    运动员购买了Kevin生产的第6个球
    运动员购买了Bob生产的第7个球
    运动员购买了Kevin生产的第7个球
    运动员购买了Bob生产的第8个球
    运动员购买了Kevin生产的第8个球
    运动员购买了Kevin生产的第9个球
    运动员购买了Bob生产的第9个球
    运动员购买了Kevin生产的第10个球
    运动员购买了Bob生产的第10个球  ## 由于消费者是while..True..的循环体所以程序运行到此处的时候会产生阻塞 ##
    
    

    如何避免上述情况

    
    from multiprocessing import Queue, Process
    import time, random
    
    
    def _put(q, worker):
        for i in range(1, 11):
            time.sleep(1)
            ball = '%s生产的第%s个球' % (worker, i)
            q.put(ball)
    
    
    def _get(q, player):
        while True:
            ball = q.get()
            if ball is None:
                break
            print('运动员%s购买了%s' % (player, ball))
            time.sleep(1)
    
    
    if __name__ == '__main__':
        q = Queue(20)
        # 生产者
        p_put_1 = Process(target=_put, args=(q, 'Bob'))
        p_put_2 = Process(target=_put, args=(q, 'Kevin'))
        # 消费者
        p_get_1 = Process(target=_get, args=(q, 'xx'))
        p_get_2 = Process(target=_get, args=(q, 'oo'))
    
        p_put_1.start()
        p_put_2.start()
        p_get_1.start()
        p_get_2.start()
    
        p_put_1.join()  # 主进程等待生产者生产完毕
        p_put_2.join()  # 主进程等待生产者生产完毕
        q.put(None)  # 当有多个消费者的时候,此时需要压入对应的None
        # 原因是在生产者和消费者模型中,当有多个进程的时候只有一个数据的时候此时谁先拿到优先拥有操作权
        # 在该例子中我们有两个消费者,如果不压入 两个 None 值则会导致其中一个跳出循环,其余没有拿到None
        # 进程依然会阻塞。
        q.put(None)
        # 为什么不使用
        # q.empty() q.qsize() q.get_nowait() 等方法
        # 主要是因为压入数据和获取数据中间存在时间差,那么在该时间差内队列中可能是空的。当消费者进行统计的时候并反馈的时候,
        # 可能在反馈结果的时候期间又有新的数据压入。导致结果不可靠
    
    

    5.3.2 生成者消费者中的 JoinableQueue

    from multiprocessing import JoinableQueue, Process
    import time, random
    
    
    def _put(q, worker):
        for i in range(1, 11):
            time.sleep(1)
            ball = '%s生产的第%s个球' % (worker, i)
            q.put(ball)
        q.join()  #
    
    
    def _get(q, player):
        while True:
            ball = q.get()
            print('运动员%s购买了%s' % (player, ball))
            time.sleep(1)
            q.task_done()  #
    
    
    if __name__ == '__main__':
        q = JoinableQueue(20)
        # 生产者
        p_put_1 = Process(target=_put, args=(q, 'Bob'))
        p_put_2 = Process(target=_put, args=(q, 'Kevin'))
        # 消费者
        p_get_1 = Process(target=_get, args=(q, 'xx'))
        p_get_2 = Process(target=_get, args=(q, 'oo'))
    
        p_put_1.start()
        p_put_2.start()
    
        # 1 设置消费者 为 守护进程
        p_get_1.daemon = True
        p_get_2.daemon = True
    
        p_get_1.start()
        p_get_2.start()
    
        p_put_1.join()  # 主进程等待生产者生产完毕
        p_put_2.join()  # 主进程等待生产者生产完毕
    
    # 1 第一步设置消费者为守护进程
    # 1.1 设置消费者等待
    #   p_put_1.join()  # 主进程等待生产者生产完毕
    #   p_put_2.join()  # 主进程等待生产者生产完毕
    # 总结:
    #     使用JoinQueue的时候的效果
    #        1. 生产者阻塞设置join后,表示生产完数据就立即执行主代码程序
    #        2. 但是在队列中设置了 JoinableQueue 的 join 表示 等待消费者彻底消费完才算结束。
    #        3. 当生产者经历了 第一轮的 队列 JoinableQueue 的 join 此时消费者已经完成了全部数据的处理
    #        4. 当生产者经历了 第二轮的 进程 Process 的 join 完成主进程的后续代码此时消费者是守护进程 随其一起结束
    #
    # 2 监控消费者是否将队列中的数据全出处理完成 调用task_done()方法
    # 3 队列 使用 join 的方法使得生产完数据后等待消费者彻底处理完数据处理
    # 总结 JoinableQueue 的两个阻塞
    # 1 第一个阻塞 来自 JoinableQueue 使用task_done()查看队列是否完成等待其彻底完成。
    # 2 第二个阻塞 来自 Process 使用将 消费者设置守护进程,此时消费者已经彻底的完成了任务,处理了全部队列中的数据, 一同结束该次任务。
    
    

    【总结】

    1. 第一个阻塞JoinableQueue 使用task_done()判断队列是否完成,如果没有完成等待其彻底完成。

    2. 第二个阻塞 Process将消费者设置守护进程(此时消费者已经彻底的完成了任务,处理了全部队列中的数据) 一同结束该次任务。

    3. 延长生产者的阻塞周期,使消费者彻底完成队列数据处理,达到供需平衡。

    6 管道 Pipe

    6.1 管道的基本认识

    In [1]: from multiprocessing import Pipe
    # left --- right # 
    In [2]: left,right = Pipe()                     
    In [3]: left.send('basketball')                             
    In [4]: right.recv()             
    Out[4]: 'basketball'
    # right --- left #
    In [5]: right.send('basketball')                            
    In [6]: left.recv()                                         
    Out[6]: 'basketball'
    

    6.2 进程中使用管道

    from multiprocessing import Pipe, Process
    
    def foo(children):
        children.send('给我转生活费')
    
    if __name__ == '__main__':
        children, father = Pipe()
        Process(target=foo, args=(children,)).start()
        print(father.recv())
    

    6.3 管道引发异常 EOFError

    from multiprocessing import Process, Pipe
    
    def foo(left, right):
        left.close()
        while True:
            try:
                res = right.recv()
                print(res)
            except EOFError:
                right.close()
                break
    
    if __name__ == '__main__':
        left, right = Pipe()
    
        Process(target=foo, args=(left, right)).start()
        right.close()
        for i in range(20):
            left.send('basketball-%s' % i)
        left.close()
    
    # 总结:
    主进程中和子进程中如果对管道关闭互相不受影响。
        left(close)     主进程      right(close)
            \-----------------------/
                                
            **********Pipe*************
    
            /-----------------------\
        left(close)      子进程    right(异常EOFError)
    
    

    6.4 管道进程之资源抢占 Lock

    7 进程池

    进程池与信号量的区别

    假设我们有100个任务

    • 信号量开启一百个进程,每次只能对指定数量的任务进行处理,执行完的进程立即销毁,进程等待任务
    • 进程池开启指定数量的进程池,任务等待进程去执行,每个进程执行完当前的任务在去执行下一个任务,达到了进程的循环使用,节约了资源.

    7.1 进程池的性能比较

    from multiprocessing import Pool, Process
    import time
    
    def foo(n):
        for i in range(100):
            print(n + 1)
    
    if __name__ == '__main__':
        start = time.time()
        pool = Pool(5)
        pool.map(foo, range(100))
        t = time.time() - start
        start = time.time()
        p_lst = list()
        for i in range(100):
            p = Process(target=foo, args=(i,))
            p_lst.append(p)
            p.start()
        for p in p_lst: p.join()
        tt = time.time() - start
        print(t, tt)
    # 输出结果
    100
    100
    100
    100
    0.11333441734313965 0.30838918685913086
    
    

    7.2 进程池的同步与异步提交

    7.2.1 进程池的同步提交

    from multiprocessing import Process, Pool
    import time, os
    
    def foo(n):
        print('foo-start-%s' % n, os.getpid())
        time.sleep(1)
        print('foo-end-%s' % n, os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply(foo, args=(i,))
        p.close()  # 结束进程池接收任务,才能使用join进行感知任务彻底执行完毕.
        p.join()  # 等待进程池中的任务彻底执行结束
        # 进程池中的进程一直都会处于活跃的状态,只有任务才会被执行完毕.
    # 输出结果
    foo-start-0 20243
    foo-end-0 20243
    foo-start-1 20244
    foo-end-1 20244
    foo-start-2 20245
    foo-end-2 20245
    foo-start-3 20246
    foo-end-3 20246
    foo-start-4 20242
    foo-end-4 20242
    foo-start-5 20243
    foo-end-5 20243
    foo-start-6 20244
    foo-end-6 20244
    foo-start-7 20245
    foo-end-7 20245
    foo-start-8 20246
    foo-end-8 20246
    foo-start-9 20242
    foo-end-9 20242
    
    

    7.2.2 进程池的异步提交

    from multiprocessing import Process, Pool
    import time, os
    
    def foo(n):
        print('foo-start-%s' % n, os.getpid())
        time.sleep(1)
        print('foo-end-%s' % n, os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            # p.apply(foo, args=(i,))
            p.apply_async(foo, args=(i,))
        p.close()  # 结束进程池接收任务,才能使用join进行感知任务彻底执行完毕.
        p.join()  # 等待进程池中的任务彻底执行结束
        # 进程池中的进程一直都会处于活跃的状态,只有任务才会被执行完毕.
    # 输出结果
    foo-start-0 20260
    foo-start-1 20257
    foo-start-2 20258
    foo-start-3 20259
    foo-start-4 20256 
    ### 同时开启了5个进程 ###
    foo-end-0 20260
    foo-start-5 20260
    foo-end-1 20257
    foo-start-6 20257
    foo-end-2 20258
    foo-start-7 20258
    foo-end-3 20259
    foo-start-8 20259
    foo-end-4 20256
    foo-start-9 20256
    foo-end-5 20260
    foo-end-6 20257
    foo-end-9 20256
    foo-end-8 20259
    foo-end-7 20258
    

    7.3 进程池的返回值

    ### 同步提交
    from multiprocessing import Pool
    import os
    
    def foo(i):
        return i * i
    
    if __name__ == '__main__':
        print('主进程-start-%s' % os.getpid())
        p = Pool(5)
        for i in range(10):
            res = p.apply(foo, args=(i,)) # apply的结果就是foo的返回值
            print(res)
        print('主进程-end-%s' % os.getpid())
    # 输出结果
    主进程-start-34980
    0
    1
    4
    9
    16
    主进程-end-34980
    ### 异步提交
    def foo(i):
        return i * i
    
    
    if __name__ == '__main__':
        print('主进程-start-%s' % os.getpid())
        p = Pool(5)
        for i in range(10):
            res = p.apply_async(foo, args=(i,))
            print(res.get()) # 阻塞等待输出
        print('主进程-end-%s' % os.getpid())
    
    主进程-start-33024
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    主进程-end-33024
    
    # 总结:
        - p.apply_async(foo, args=(i,))为异步提交.
        - 但是 res.get() 等待结果为同步.换句话说可能都已经执行完毕但是等待结果需要阻塞输出.
    
    # 异步执行
    if __name__ == '__main__':
        print('主进程-start-%s' % os.getpid())
        p = Pool(5)
        rlt = []
        for i in range(10):
            # res = p.apply(foo, args=(i,))
            res = p.apply_async(foo, args=(i,))
            rlt.append(res)
        for res in rlt:
            print(res.get())
        print('主进程-end-%s' % os.getpid())
    # map执行异步
    if __name__ == '__main__':
        print('主进程-start-%s' % os.getpid())
        p = Pool(5)
        ret = p.map(foo,range(10))
        print(ret)
        print('主进程-end-%s' % os.getpid())
    
    # 输出结果
    主进程-start-29644
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    主进程-end-29644
    
    

    7.4 进程池的回调函数

    from multiprocessing import Pool
    import os
    
    def foo(n):
        print('foo-running-%s' % os.getpid())
        return n * n
    
    def bar(x):
        print('bar-running-%s' % os.getpid())
        print(x)
    
    if __name__ == '__main__':
        print('主进程-%s' % os.getpid())
        p = Pool(5)
        # for i in range(10):
        p.apply_async(foo, args=(10,), callback=bar)
        p.close()
        p.join()
    
    # 回调函数
    主进程-37752
    foo-running-12472
    bar-running-37752
    100
    

    7.5 进程之间的数据共享

    from multiprocessing import Manager, Process, Lock
    
    def bar(d, lock):
        lock.acquire()
        d['count'] -= 1
        lock.release()
    
    if __name__ == '__main__':
        m = Manager()
        l = Lock()
        d = m.dict({'count': 100})
        lst = []
        for i in range(50):
            p = Process(target=bar, args=(d, l))
            p.start()
            lst.append(p)
        for i in lst:
            i.join()
        print('主进程', d)
    
    

    进程池开启soket-server端

    相关文章

      网友评论

          本文标题:Python-进程

          本文链接:https://www.haomeiwen.com/subject/coazlctx.html