美文网首页
生产者和消费者模型-队列

生产者和消费者模型-队列

作者: 断尾壁虎V | 来源:发表于2019-01-02 13:47 被阅读0次

    队列(进程通信ipc)

    队列主要用于解决进程间通信的问题,队列底层就是通过管道和锁的方式实现的。

    代码示例:

    from multiprocessing import Queue
    import time
    
    q=Queue(3)    # 指定队列的长度
    
    #队列相关的操作方法
    # put,get,put_nowait,get_nowait,full,empty
    q.put(3)      # 向队列中存放数据,可以是任何类型的数据
    q.put(3)
    q.put(3)
    print(q.full())   # 如果队列满,则返回 True, 否则返回 False
    
    print(q.get())    # 依次取出数据
    print(q.get())
    print(q.get())
    print(q.empty())  # 如果队列为空,则返回True,否则返回 False
    

    主要方法

    • q.put(): 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
    • q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
    • q.get_nowait():同q.get(False)
    • q.put_nowait():同q.put(False)
    • q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
    • q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    线程Queue

    同进程队列一样,线程也有对于的方法,叫做线程Queue.

    import queue
    
    q=queue.Queue(3)   # 队列:先进先出,指定队列的大小
    
    q.put(1)  # 向队列中放入数据
    q.put(2)
    q.put(3)
    print(q.get()) # 从队列中取出数据
    # q.put(4)    # 当队列满后会等待有空闲位置时再放入
    # q.put_nowait(4)  # 立即放入数据,不等待,如果队列已经满,则会报错。
    
    q.put(4,block=False) # 与put_nowait()方法一样,设置不等待,直接放入
    q.put(4,block=True,timeout=3)   # 等待,且超时时间为3s
    
    
    

    优先级队列:

    import queue
    
    q=queue.PriorityQueue(3)  # 优先级队列
    
    q.put((10,'a')) # 指定优先级,数字越小,优先级越高
    q.put((-3,'b'))
    q.put((100,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    # 输出结果:
    (-3, 'b')
    (10, 'a')
    (100, 'c')
    
    

    堆栈,后进先出:

    import queue
    
    q=queue.LifoQueue(3) # 堆栈:后进先出
    q.put(1)
    q.put(2)
    q.put(3)
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    

    输出:

    3
    2
    1
    

    生产者和消费者模型

    为了避免死锁问题,能够解耦合,定义了生产者消费者模型。生产者只需要创造数据,然后将数据放入队列,消费者则从队列中取出数据,对数据进行消费。
    下面是使用多进程实现了简单的生产者和消费者模型:

    from multiprocessing import Process,Queue
    import random
    import time
    
    def producer(name,food,q):
        for i in range(10):
            res='%s%s' %(food,i)
            time.sleep(random.randint(1,3))
            q.put(res)
            print("厨师[%s]生产了<%s>" %(name,res))
    
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('吃货[%s]吃了<%s>' %(name,res))
    
    if __name__=='__main__':
        q=Queue()
        p1=Process(target=producer,args=('andy','包子',q))
        c1=Process(target=consumer,args=('bob',q))
    
        p1.start()
        c1.start()
        print('主进程')
    

    在实际的应用中,可能会有多个生产者和消费者,而且我们必须保证在生产者已经生产完数据,并且消费者消费完数据后程序正常退出,所以这里需要使用到JoinableQueue模块。

    from multiprocessing import Process,JoinableQueue   # 导入可以使用join方法的模块
    import random
    import time
    
    def producer(name,food,q):
        for i in range(3):
            res='%s%s' %(food,i)
            time.sleep(random.randint(1,3))
            q.put(res)
            print("厨师[%s]生产了<%s>" %(name,res))
    
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('吃货[%s]吃了<%s>' %(name,res))
            q.task_done()    # 通过使用队列的task_done方法,通知每一次从队列取出的信息
    
    if __name__=='__main__':
        q=JoinableQueue()
        p1=Process(target=producer,args=('andy','包子',q))
        p2=Process(target=producer,args=('Tom','包子',q))
    
        c1=Process(target=consumer,args=('bob',q))
        c2=Process(target=consumer,args=('Lucy',q))
        c3=Process(target=consumer,args=('David',q))
        c1.daemon=True     # 设置为守护进程,当主进程运行完毕时,此子进程也退出
        c2.daemon=True
        c3.daemon=True
    
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        c3.start()
    
        p1.join()     # 等待生产子进程运行结束
        p2.join()
        q.join()      # 等待队列为空 后结束主进程
        print('主进程')
    
    
    
    

    说明:

    • JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

    相关文章

      网友评论

          本文标题:生产者和消费者模型-队列

          本文链接:https://www.haomeiwen.com/subject/gyrzlqtx.html