美文网首页
day37-并发编程之多进程2

day37-并发编程之多进程2

作者: 天行_b6d0 | 来源:发表于2020-08-27 17:11 被阅读0次

    一、同步、异步和阻塞、非阻塞

    同步

    所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。

    举例:

    1. multiprocessing.Pool下的apply  # 发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算还是在io阻塞,总之就是一股脑地等任务结束
    2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()
    3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()
    

    异步:

    异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。

    举例:

    1. multiprocessing.Pool().apply_async()  # 发起异步调用后,并不会等待任务结束才返回,相反,会立即获取一个临时结果(并不是最终的结果,可能是封装好的一个对象)。
    2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
    3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
    

    阻塞:

    阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。

    举例:

    1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
    2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。

    非阻塞:

    非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

    二、守护进程

    主进程创建守护进程(p.daemon=True):

    • 其一:守护进程会在主进程代码执行结束后就终止
    • 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    在子进程执行start之前将其定义为守护进程。

    三、进程同步(锁)

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

    就好比我们网上抢票一样,查票的过程是异步进行,而由于网路延迟的原因,抢票就要是同步来实现。

    模拟抢票如下:

    ticket文件(数据使用json格式写)
    {"count": 0}

    买票:

    from multiprocessing import Process, Lock
    import json
    import time
    import random
    
    
    # 查看还有几张票
    def search():
        with open("ticket", "r", encoding="utf-8") as f:
            dic = json.load(f)
        print('余票%s张' % dic["count"])
    
    
    # 买票
    def buy():
        with open("ticket", "r", encoding="utf-8") as f:
            dic = json.load(f)
        time.sleep(random.randint(1, 3))  # 模拟网络延迟
        if dic["count"] != 0:
            dic["count"] -= 1
            with open("ticket", "w", encoding="utf-8") as f:
                json.dump(dic, f)
                print("买票成功!")
        else:
            print("余票不足!")
    
    
    def buy_ticket(lock):
        search()
        lock.acquire()  # 加锁
        buy()
        lock.release()  # 解锁
    
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            t = Process(target=buy_ticket, args=(lock,))
            t.start()
    

    四、队列(先进先出)

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

    创建队列的类(底层就是以管道和锁定的方式实现):
    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。其中maxsize是队列中允许的最大项数,如果不写则默认是一个很大的数。

    主要方法:

    • q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

    • q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

    • q.get_nowait():同q.get(False)

    • q.put_nowait():同q.put(False)

    • q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。

    • q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。

    • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    五、生产者和消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    基于队列实现生产者消费者模型:

    import time
    import random
    from multiprocessing import Process, JoinableQueue, Queue
    
    
    def producer(name, food, q):
        for i in range(10):
            print(f"{name}制造了一个{food},一共制造了{i+1}个")
            # 模拟制造食物的延迟
            time.sleep(random.randint(1, 3))
            q.put(food)
    
    
    def consumer(name, q):
        while True:
            food = q.get()
            # 模拟消费食物的延迟
            time.sleep(random.randint(1, 3))
            print(f"{name}消费了一个{food}")
            q.task_done()  # 每task_done一次 就从队列里删掉一个元素,这样在最后join的时候根据队列长度是否为零来判断队列是否结束
    
    
    if __name__ == '__main__':
        q = JoinableQueue()  # 一个有join方法的队列
    
        # 创造两个生产者进程
        p = Process(target=producer, args=("yan", "蛋糕", q))
        p.start()
        p1 = Process(target=producer, args=("Jack", "面包", q))
        p1.start()
    
        # 创造3个消费者进程
        c = Process(target=consumer, args=("小明", q))
        c.start()
        c1 = Process(target=consumer, args=("小红", q))
        c1.start()
        c2 = Process(target=consumer, args=("小黄", q))
        c2.start()
    
        # 等所有生成者进程结束主进程再结束
        p.join()
        p1.join()
        q.join()  # 阻塞等到队列中没有值了放行
        print("消费完毕")
    

    相关文章

      网友评论

          本文标题:day37-并发编程之多进程2

          本文链接:https://www.haomeiwen.com/subject/nmdnjktx.html