在线程世界⾥,⽣产者就是⽣产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果⽣产者处理速度很快,⽽消费者处理速度很慢,那么⽣产者就必须等待消费者处理完,才能继续⽣产数据。同样的道理,如果消费者的处理能⼒⼤于⽣产者,那么消费者就必须等待⽣产者。为了解决这个问题于是引⼊了⽣产者和消费者模式。
⽣产者消费者模式是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。
比如,对于同时爬取多个网页的多线程爬虫,在某一时刻你可能无法保证他们在处理不同的网站,在某些时刻他们极有可能在处理相同的网站,这岂不浪费?为了解决这个问题,可以将不同网页的url放在queue中,然后多个线程来读取queue中的url进行解析处理,而queue只允许一次出一个,出一个少一个。相同网站上不同网页的url通常有某种规律,比如某个字段的数字加1,这种情况完全可以用这种模式,“生产者程序”负责根据规律把完整的url制作出来,再塞进queue里面(如果queue满了,则等待);“消费者程序(网页解析程序)”从queue的后面挨个取出url进行解析(如果queue里面是空的,则等待),即使是多线程也能保证每个线程得到的是不同的url。这个过程中,生产者和消费彼此互不干涉。
下面以实例说明如何将queue与多线程相结合形成所谓的“生产者+消费者”模式,同时解决多线程如何退出的问题(注意下例中是“一个生产者+多个消费者”的形式,多生产者+多消费者的模式可在此基础上进一步实现):
import threading
import time
import queue
def producer(name,q,data): #生产者,从data里面取数据,塞入队列q,如果q已满,则等待
for i in data:
q.put(i)
print('Producer {} put {}; '.format(name,i))
time.sleep(1)
q.put(None) #将None放入queue作为标记给生产者用
def consumer(name,q): #消费者,从q里拿出数据并处理
while True: #通过while的方式推动生产者不断尝试从queue中取数据
get_result = q.get() #从队列里挨个取出数据
q.task_done() #发送当前q中对应元素已被取出的消息,与后面的q.join()配合形成阻塞,保证q结束再执行其他程序,否则后面的q.join()将永远阻塞(后面程序无法执行)
if get_result==None: #判断生产者是否已经结束,即将data的所有数据都加入queue中
q.put(None) #该步很重要,当producer()放入的None被某个consumer()抽取后,其他consumer()就没有结束标志了。缺点是最后queue中始终留有结束标志
print("All data have been tooken out!")
break
print('consumer {} get {}; '.format(name,get_result)) #输出当前从queue中得到的数据
time.sleep(0.1)
def main():
data = list(range(10)) #待处理的原始数据
consumer_names = ['甲','乙','丙'] #2个消费者
q = queue.Queue(maxsize=5) #生成一个最大容量为5的queue
threads = [] #线程列表
p = threading.Thread(target=producer,args=('A',q,data)) #生成一个生产者线程对象,该生产者名为A
threads.append(p) #将该线程加入线程列表
for consumer_name in consumer_names: #为每一个消费者分配一个线程
t = threading.Thread(target=consumer,args=(consumer_name,q)) #消费者都从q里面拿数据
threads.append(t) #将五个消费者线程加入线程列表
for i in threads: #并列启动所有线程
i.setDaemon(True) #保证子线程在主线程退出时,无论出于什么状况都强制退出
i.start()
for i in threads: #将所有线程阻塞(即,不执行完,就不执行后面程序)
i.join()
# 判断q里面是否还有剩余的对象没有处理(包括None),有则挨个拿出,否则q不为空后面的q.join()将一直阻塞(后面程序无法执行)
if not q.empty():
for i in range(q.qsize()):
q.get()
q.task_done() #不能删除,作用于前一个q.task_done()相同,
q.join() #保证q阻塞,接受前面所有的q.task_done()发来的信息,否则程序一直停在该处不往下执行。(必须保证前面任何一处出现q.get()后都有q.task_done())
print("Program is over!")
if __name__ == '__main__':
main()
输出:
Producer A put 0;
consumer 甲 get 0;
Producer A put 1; consumer 乙 get 1;
Producer A put 2; consumer 丙 get 2;
Producer A put 3; consumer 甲 get 3;
Producer A put 4; consumer 乙 get 4;
Producer A put 5; consumer 丙 get 5;
Producer A put 6; consumer 甲 get 6;
Producer A put 7; consumer 乙 get 7;
Producer A put 8; consumer 丙 get 8;
Producer A put 9; consumer 甲 get 9;
All data have been tooken out!
All data have been tooken out!
All data have been tooken out!
Program is over!
上述程序的过程如下图:

注意:
(1)上述程序中生产者插入queue的时间间隔为0.1s,而消费者的取出时间间隔为2s,显然消费速度不如生产速度,一开始queue是空的,一段时间后queue就变满了,输出结果正说明了这一点。如果将两个时间调换,则结果相反,queue永远不会满,甚至只有1个值,因为只要进去就被消费了。
(2)消费者程序是通过“while”来推动不断执行的,何时结束?上例中通过在queue中增加None的形式告诉消费者,生产者已经结束了,消费者也可以结束了。但消费者有多个,到底由哪个消费者得到None?为解决这个问题,上例中在消费者中先判断当前取出的是不是None,如果是,则先在queue里插入一个None,然后再break当前这个消费者线程,最后的结果是所有的消费者线程都退出了,但queue中还剩下None没有被取出。因此在程序的后面增加了一个for循环来挨个把queue中的元素取出,否则最后的q.join()将永远阻塞,程序无法往下执行。
(3)程序中每一个q.get()后面都跟有一个q.task_done(),其作用是从queue中取出一个元素就给q.join()发送一个信息,否则q.join()将永远处于阻塞状态,直到所有queue元素都被取出。
多线程“生产者-消费者”模式一般性结构图


网友评论