美文网首页
生产者消费者模式

生产者消费者模式

作者: chen_000 | 来源:发表于2017-06-12 21:02 被阅读0次

    什么是生产者消费者模式

    在软件开发的过程中,经常碰到这样的场景:

    某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。

    举一个寄信的例子。假设你要寄一封信,大致过程如下:

    你把信写好——相当于生产者生产数据

    你把信放入邮箱——相当于生产者把数据放入缓冲区

    邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据。

    生产者消费者模式的优点

    解耦

    假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

    举个例子,我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。

    并发

    由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

    继续上面的例子,如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。

    支持忙闲不均

    当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。

    我们再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。

    通过上面的介绍大家应该已经明白了生产者消费者模式。

    Python中的多线程编程

    在实现生产者消费者模式之前,我们先学习下Python中的多线程编程。

    线程是操作系统直接支持的执行单元,高级语言通常都内置多线程的支持,Python也不例外,并且Python的线程是真正的Posix Thread,而不是模拟出来的线程。

    Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

    下面我们先看一段在Python中实现多线程的代码。

    importtime,threading

    #线程代码

    classTaskThread(threading.Thread):

    def __init__(self,name):

    threading.Thread.__init__(self,name=name)

    def run(self):

    print('thread %s is running...'%self.getName())

    foriinrange(6):

    print('thread %s >>> %s'%(self.getName(),i))

    time.sleep(1)

    print('thread %s finished.'%self.getName())

    taskthread=TaskThread('TaskThread')

    taskthread.start()

    taskthread.join()

    下面是程序的执行结果:

    thread TaskThreadisrunning...

    threadTaskThread>>>0

    threadTaskThread>>>1

    threadTaskThread>>>2

    threadTaskThread>>>3

    threadTaskThread>>>4

    threadTaskThread>>>5

    thread TaskThreadfinished.

    TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定线程的名字,通过重载基类run函数实现具体任务。

    在简单熟悉了Python的线程后,下面我们实现一个生产者消费者模式。

    from Queue import Queue

    importrandom,threading,time

    #生产者类

    classProducer(threading.Thread):

    def __init__(self,name,queue):

    threading.Thread.__init__(self,name=name)

    self.data=queue

    def run(self):

    foriinrange(5):

    print("%s is producing %d to the queue!"%(self.getName(),i))

    self.data.put(i)

    time.sleep(random.randrange(10)/5)

    print("%s finished!"%self.getName())

    #消费者类

    classConsumer(threading.Thread):

    def __init__(self,name,queue):

    threading.Thread.__init__(self,name=name)

    self.data=queue

    def run(self):

    foriinrange(5):

    val=self.data.get()

    print("%s is consuming. %d in the queue is consumed!"%(self.getName(),val))

    time.sleep(random.randrange(10))

    print("%s finished!"%self.getName())

    def main():

    queue=Queue()

    producer=Producer('Producer',queue)

    consumer=Consumer('Consumer',queue)

    producer.start()

    consumer.start()

    producer.join()

    consumer.join()

    print'All threads finished!'

    if__name__=='__main__':

    main()

    执行结果可能如下:

    Producerisproducing0tothequeue!

    Consumerisconsuming.0inthe queueisconsumed!

    Producerisproducing1tothequeue!

    Producerisproducing2tothequeue!

    Consumerisconsuming.1inthe queueisconsumed!

    Consumerisconsuming.2inthe queueisconsumed!

    Producerisproducing3tothequeue!

    Producerisproducing4tothequeue!

    Producerfinished!

    Consumerisconsuming.3inthe queueisconsumed!

    Consumerisconsuming.4inthe queueisconsumed!

    Consumerfinished!

    All threadsfinished!

    因为多线程是抢占式执行的,所以打印出的运行结果不一定和上面的完全一致。

    相关文章

      网友评论

          本文标题:生产者消费者模式

          本文链接:https://www.haomeiwen.com/subject/mekrqxtx.html