美文网首页程序员
再读《生产者与消费者》

再读《生产者与消费者》

作者: Armstrong_Q | 来源:发表于2017-03-16 22:57 被阅读95次

    多线程经常使用,生产者消费者模式也经常读到,但是实际理解可能还不够透彻,所以想写个日志梳理下知识的脉络,温故而知新。

    简单的模型

    先从一个例子开始吧,有一些角色我先声明如下:

    • 餐厅(Restaurant)--->载体
    • 厨师(Chef) --->生产者
    • 服务员(WaiterPerson) --->消费者
    • 食物(Meaf)--->被消费

    我梳理一下它们的工作流程:

    • 故事的地点发生在餐厅,它是载体,包括了厨师、服务员、食物。

    • 厨师在餐厅做饭,做完饭,饭放在橱窗,通知服务员端走,送给客人吃完;期间,厨师会不断地监控橱窗的食物是否被端走,如果端走则继续做新的食物,否则等待。

    • 服务员也不能闲着,它时刻留心着橱窗是否有食物上架,如果没有则继续等待。如果有食物则端走,并通知厨师,我端走食物了,你可以做新食物了。

    那么按照上面的步骤,首先我们看看生产者Chef的基本代码

       synchronized(this){
           while (restaurant.meal != null) {
                  wait(); 
           }
      }
    

    上面代码表示,倘若食物已经做好一份了,厨师不断监控橱窗上面的食物,如果没有被服务员端走(消费),那我厨师就继续等待,多休息一会。注意这里用while而不是if是因为防止多个消费者产生竞争引起并发问题。

     System.out.println("饭做好了,订单生成...")。
      synchronized(restaurant.waiter){
           restaurant.meal=new Meal();
           restaurant.waiter.notifyAll();
     }
    

    上面代码表示 ,厨师没有等待了,他开始做饭(生产),完成生产食物后,厨师通知(notifyAll)正在橱窗等待食物的服务员,叫他去端菜(消费)。

    那消费者Waiter的流程呢?我想过程应该是和生产者恰好是对立的。

    synchronized(this){
        while(restaurant.meal==null){
               wait();
          }
    }
    

    上面代码表示,服务员不断监控橱窗上面的食物有没有做好,如果没有做好,那我服务员就继续等待。 是吧?和前面的生产者的判断条件刚好对立。

     System.out.println("我服务员把饭端走了...")。
     synchronized(restaurant.chef){
           restaurant.meal=null;
           restaurant.chef.notifyAll();
    }
    

    上面代码表示 ,服务员被厨师通知端饭(消费)了,于是他开始端饭送个客人,导致橱窗上没有饭了,之后,服务员通知(notifyAll)橱窗口正在等待的厨师去做下一道菜(生产)。

    通过上面的例子,我们可以初步了解生产者与消费者的工作模式。但是实际开发场景中,应该有不止一个生产者或者消费者,而且食物应该很多,那么这个时候我们应该引入队列(Queque)这个数据结构来管理它们了。

    利用队列管理生产者与消费者

    我们可以设想一下,在餐厅中的业务场景,厨师chef应该作为Runable角色可以有多个,我们可以用Excutor.submit(r)提交很多个厨师,让其工作, 而服务员我们也可以有多个,同理,我们也把他放入线程池去运行。 而食物Meal也有多个,并且我们要用一个数据结构存取它,让它作为厨师和服务员两者共同占有的资源又能做好同步处理。在上面的例子中,我们用wait(),notifyAll(),synchronized等方法进行食物的同步与通信。它们有一个明显的缺点,我们发现代码很是耦合,晦涩难懂,暂且不谈性能。

    让开发者欣慰的事,JDK中提供了BlockQueque接口来存取“食物”。它是一个阻塞队列的数据结构。在这里,我们需要了解两点;

    • 在开发过程中,"食物"常常指是的IO流。如网络编程中,服务端与客户端发送字节流相互通信。现在有netty或者nio等异步非阻塞IO的框架,让并发性能更佳。

    • 阻塞是为了保证生产者与消费者步调一致,不要产生大量浪费的食物,消费者吃不完,导致资源耗尽。亦或者消费者盲目的去找生产者要食物,太多消费者拥挤,也会消耗资源。所以在刚刚开始的时候,jdk做了这个BlockQueque来管理食物和生产者和消费者通信。 生产者要生产食物如下面的代码:

      @Override
        public void run() {
        try {
            while (!Thread.interrupted()) {
                Meal meal = new Meal(++count);
                mBlockQueque.put(meal);// 如果mBlockQueque容量不为empty则阻塞等待。                                             
                TimeUnit.SECONDS.sleep(2);//模拟生产耗时任务。
             }
          } catch (InterruptedException e) {
            System.out.println("Chef sleep end interrupted...");
            e.printStackTrace();
          }
      }
      

    上面mBlockQueque.put()为阻塞方法(如果橱窗(队列)还有食物未被领取,则等待不生产食物,否则生产食物并添加至橱窗),如注释上的说明,它的作用类似wait()/add();我们跟踪下源码:

       /**
         * @throws NullPointerException {@inheritDoc}
         * @throws InterruptedException {@inheritDoc}
         */
        public void putFirst(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            Node<E> node = new Node<E>(e);
            final ReentrantLock lock = this.lock;
            lock.lock(); //                                            ---(1)
            try {
                while (!linkFirst(node))
                    notFull.await();                                   ---(2)
            } finally {
                lock.unlock();
            }
        }
    

    我解析下上面的代码:put()是一个接口方法,它具体的实现方法之一是putFirst,给链表首位添加一个元素。

    1. (1)此处有lock-finally-unlock组成的临界区。它的作用类似synchronized,用来同步。它们之间不同的地方是:

    一、用synchronized声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B则一直等待直到A释放锁,B一直阻塞着不能被中断。
    二、用lock-finally-unlock声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B可以等待一段时间,不想等待了,可以自行中断。A如果想释放锁必须在finally后调用unlock。所以说我觉得lock更加灵活。
    但是在大多数资源竞争不太激烈的情况下,我们还是用synchronized足够了。

    1. (2)此处notFull是Condition的实例。它提供更好的性能,通过await()/signal()方法扮演之前的wait()/notify()的角色。 这里代码是指while判断链表是否超过容量,返回false时,则调用await()阻塞等待当前任务线程。

    我们分析完生产者chef,我们来看看消费者waiter的改造后的代码:

    @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Meal meal = mBlockQueque.take();//从队列中remove出一个食物,没有食物则阻塞等待
                    meal.run();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    上述代码中,mBlockQueque.take()是一个可阻塞方法。它试图从橱窗队列上取食物,如果发现没有食物就阻塞消费者线程。看看take()的具体实现的源码:

     public E takeFirst() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E x;
                while ( (x = unlinkFirst()) == null) //                       ---(1)
                    notEmpty.await();                                         ---(2)
                return x;
            } finally {
                lock.unlock();
            }
        }
    

    (1)takeFisrt()方法中会有去调用unlinkFirst()去队列返回一个食物,如果有食物,就返回,并调用notFull.signal()唤醒正在阻塞的生产者线程。
    (2) notEmpty是另外一个Condition实例,它用来和消费者线程通信。如果发现返回的食物为空,则notEmpty.await()让消费者线程阻塞等待。
    至此。我们看到我们把具体的通信交互过程封装到了阻塞队列BlockQueue里。 生产者只需要调用take通信,消费者只需调用put通信。如下图:

    通信结构

    写到这里了,那生产者与消费者模式有哪些实际应用呢? 我想线程池应该是应用最广泛的地方。下一篇我将详细介绍线程池的原理。

    注:部分参考自《Java 编程思想》

    相关文章

      网友评论

        本文标题:再读《生产者与消费者》

        本文链接:https://www.haomeiwen.com/subject/gcmlnttx.html