美文网首页
Python多线程(四):生产者消费者问题

Python多线程(四):生产者消费者问题

作者: NWKYEKJ | 来源:发表于2019-03-15 10:22 被阅读0次

    上一篇:

    生产者消费者问题是多线程中一个很经典并发协作的问题,这个问题主要包含两类线程,一个是生产者用于生产数据,另一个是消费者用于消费数据,两者操作同一个数据共享区域,这种模型在编程中非常常见,比如爬虫,生产者负责爬取链接,消费者负责解析链接所指向的网页内容。这种模型需要满足下面的两个特征:

    • 消费者在数据共享区域为空时阻塞,直到共享区域出现新数据。
    • 生产者在数据共享区域满时阻塞,直到数据共享区出现空位。

    下面是一个简单的例子:

    import threading
    import time
    import random
    MAX_BUFF_LEN = 5
    
    buff = []
    lock = threading.Lock()
    
    class Producer(threading.Thread):
        def run(self):
            global buff
            while True:
                lock.acquire()
                if len(buff) < MAX_BUFF_LEN:
                    # 如果共享区域未满,生产数据
                    num = random.uniform(0, 5)
                    buff.append(num)
                    print('生产者向共享区域加入%f' % num)
                    lock.release()
                time.sleep(random.uniform(0, 10))
    
    class Consumer(threading.Thread):
        def run(self):
            global buff
            while True:
                lock.acquire()
                if buff:
                    # 如果共享区非空,消费数据
                    num = buff.pop(0)
                    print('消费者消费掉%f' %num)
                    lock.release()
                time.sleep(random.uniform(0, 10))
    
    producer = Producer()
    consumer = Consumer()
    producer.setDaemon(True)
    consumer.setDaemon(True)
    try:
        producer.start()
        consumer.start()
        producer.join()
        consumer.join()
    except KeyboardInterrupt:
        print('程序强制结束!')
    

    程序运行结果如下:

    生产者向共享区域加入1.653411
    消费者消费掉1.653411
    生产者向共享区域加入2.176285
    生产者向共享区域加入4.727504
    生产者向共享区域加入3.053323
    消费者消费掉2.176285
    生产者向共享区域加入0.951072
    消费者消费掉4.727504
    ^C程序强制结束!
    

    在程序中设置两个进程为守护进程,并捕捉KeyboardInterrupt错误,一旦捕捉到就结束主线程,同时结束两个子线程。上面是一个生产者消费者模型的一个简单实现,通过共享变量的方式使两个线程互相通信来达成一致。共享变量是线程间通信的常用方法,只要记得在对共享变量进行操作时加锁,程序就不会有问题。

    但是上面的代码也有问题,在于这种代码通过无限对共享变量访问的方式进行判断空还是满,这样也降低了效率。因为其中一个程序在明明知道buff满了或者空了的情况下还要进行无意义的循环,由于GIL机制,它会和其他线程争夺执行权。如果某一方在判断buff满了或者空了的情况下主动阻塞,直到另外一方通知它,它才恢复,这样就能最大化的效率。

    Python中threading中的Condition类就是来帮助我们完成这件事的。它的waitnotify方法能够阻塞和通知一个线程,下面还是通过例子来了解一下:

    import threading
    import time
    import random
    MAX_BUFF_LEN = 5
    
    buff = []
    condition = threading.Condition()
    
    class Producer(threading.Thread):
        def run(self):
            global buff
            while True:
                condition.acquire()
                if len(buff) < MAX_BUFF_LEN:
                    # 如果共享区域未满,生产数据
                    num = random.uniform(0, 5)
                    buff.append(num)
                    print('生产者向共享区域加入%f' % num)
                    condition.notify()
                else:
                    # 如果共享区满,停止生产
                    print('共享区满,生产者阻塞!')
                    condition.wait()
                condition.release()
                time.sleep(random.uniform(0, 10))
    
    class Consumer(threading.Thread):
        def run(self):
            global buff
            while True:
                condition.acquire()
                if buff:
                    # 如果共享区非空,消费数据
                    num = buff.pop(0)
                    print('消费者消费掉%f' %num)
                    condition.notify()
                else:
                    # 如果共享去空,停止消费
                    print('共享区空,消费者阻塞!')
                    condition.wait()
                condition.release()
                time.sleep(random.uniform(0, 10))
    
    producer = Producer()
    consumer = Consumer()
    producer.setDaemon(True)
    consumer.setDaemon(True)
    try:
        producer.start()
        consumer.start()
        producer.join()
        consumer.join()
    except KeyboardInterrupt:
        print('程序强制结束!')
    

    程序结果:

    生产者向共享区域加入0.040350
    消费者消费掉0.040350
    共享区空,消费者阻塞!
    生产者向共享区域加入3.266167
    消费者消费掉3.266167
    生产者向共享区域加入3.468917
    ^C程序强制结束!
    

    上面的代码中,acquire方法实际上是获得锁,wait方法将线程阻塞,实际上是将锁释放。当一个线程调用notify方法时,另一个线程就被唤醒,但是这时候这个线程并没有调用wait或者release方法释放锁,因此另一个线程虽然醒过来了但是还是没有执行,直到这个线程将锁释放。

    在使用共享变量的时候,需要时刻注意是否线程安全,非常不方便。好在是Python中提供了一个Queue类,它是线程安全的,有了它我们可以把注意力放在如何实现代码逻辑上,而不是过多的注意到线程安全上。在Python2.7中该模块名为Queue,而在Python3.6中该模块名为queue。使用Queue类改进的代码如下:

    import threading
    import time
    import random
    from queue import Queue
    
    MAX_BUFF_LEN = 5
    
    buff = Queue(MAX_BUFF_LEN)
    condition = threading.Condition()
    
    class Producer(threading.Thread):
        def run(self):
            global buff
            while True:
                num = random.uniform(0, 5)
                buff.put(num)
                print('生产者向共享区域加入%f' % num)
                time.sleep(random.uniform(0, 10))
    
    class Consumer(threading.Thread):
        def run(self):
            global buff
            while True:
                num = buff.get()
                print('消费者消费掉%f' %num)
                time.sleep(random.uniform(0, 10))
    
    producer = Producer()
    consumer = Consumer()
    producer.setDaemon(True)
    consumer.setDaemon(True)
    try:
        producer.start()
        consumer.start()
        producer.join()
        consumer.join()
    except KeyboardInterrupt:
        print('程序强制结束!')
    

    Queue是一个FIFO队列,它的get方法和put方法分别是入队和出队,在入队和出队时获取了锁以保证线程安全,如果队列空或者满,默认情况下get方法和put方法自动阻塞。阻塞和唤醒的方式实质上是调用了Condition类的waitnotify方法。Queue类比较简单,推荐大家直接查看源码或者官方文档。

    这里还有一篇写得非常好的博客,推荐大家去看看:Producer-consumer problem in Python

    相关文章

      网友评论

          本文标题:Python多线程(四):生产者消费者问题

          本文链接:https://www.haomeiwen.com/subject/vinbpqtx.html