美文网首页
生产者-消费者(wait-notifyAll)

生产者-消费者(wait-notifyAll)

作者: 追梦小蜗牛 | 来源:发表于2019-12-20 12:21 被阅读0次
    black-sedan-parked-under-tree-1841120.jpg

    简介:

    生产者、消费者真的是老生常谈了,很多地方都会用到这种思想,可见其重要程度。所以,还是有必要了解一下它的最原始的实现方式的,也就是利用Object的wait和notifyAll方法。

    源码:

    /**
         * Causes the current thread to wait until another thread invokes the
         * {@link java.lang.Object#notify()} method or the
         * {@link java.lang.Object#notifyAll()} method for this object.
         * In other words, this method behaves exactly as if it simply
         * performs the call {@code wait(0)}.
         * <p>
         * The current thread must own this object's monitor. The thread
         * releases ownership of this monitor and waits until another thread
         * notifies threads waiting on this object's monitor to wake up
         * either through a call to the {@code notify} method or the
         * {@code notifyAll} method. The thread then waits until it can
         * re-obtain ownership of the monitor and resumes execution.
         * <p>
         * As in the one argument version, interrupts and spurious wakeups are
         * possible, and this method should always be used in a loop:
         * <pre>
         *     synchronized (obj) {
         *         while (&lt;condition does not hold&gt;)
         *             obj.wait();
         *         ... // Perform action appropriate to condition
         *     }
         * </pre>
         * This method should only be called by a thread that is the owner
         * of this object's monitor. See the {@code notify} method for a
         * description of the ways in which a thread can become the owner of
         * a monitor.
         *
         * @throws  IllegalMonitorStateException  if the current thread is not
         *               the owner of the object's monitor.
         * @throws  InterruptedException if any thread interrupted the
         *             current thread before or while the current thread
         *             was waiting for a notification.  The <i>interrupted
         *             status</i> of the current thread is cleared when
         *             this exception is thrown.
         * @see        java.lang.Object#notify()
         * @see        java.lang.Object#notifyAll()
         */
        public final void wait() throws InterruptedException {
            wait(0);
        }
    
        /**
         * Wakes up all threads that are waiting on this object's monitor. A
         * thread waits on an object's monitor by calling one of the
         * {@code wait} methods.
         * <p>
         * The awakened threads will not be able to proceed until the current
         * thread relinquishes the lock on this object. The awakened threads
         * will compete in the usual manner with any other threads that might
         * be actively competing to synchronize on this object; for example,
         * the awakened threads enjoy no reliable privilege or disadvantage in
         * being the next thread to lock this object.
         * <p>
         * This method should only be called by a thread that is the owner
         * of this object's monitor. See the {@code notify} method for a
         * description of the ways in which a thread can become the owner of
         * a monitor.
         *
         * @throws  IllegalMonitorStateException  if the current thread is not
         *               the owner of this object's monitor.
         * @see        java.lang.Object#notify()
         * @see        java.lang.Object#wait()
         */
        public final native void notifyAll();
    

    思路:

    首先要想的是生产者和消费者分别需要用java里面的什么来实现,毫无疑问,肯定要用线程了,因为它是最基本的调度单元,可以在其run方法里面处理自定义逻辑。确定了线程之后,还要找一个地方来存储生产者生产出来的产品,首先这个容器要有序的,那就用最简单的ArrayList了。有的时候为了测试方便,我们有可能要协调多个线程,可能需要借助于CountDownLatch来实现。

    举例1:

    这个例子展示的是,开启多个生产者生产一定量的产品放进队列,等所有的生产者都生产完之后,打印生产了多少个产品。然后,多个消费者再开始消费,消费完之后,再打印,容器里面还剩余多少个产品。以此体验生产和消费的思想,大家可以在自己的IDE上运行一下。

        @Test
        public void test01() throws InterruptedException {
            CountDownLatch countDownLatchProducer = new CountDownLatch(15);
            CountDownLatch countDownLatchConsumer = new CountDownLatch(6);
            for (int i = 0; i < 3; i++) {//定义3个线程
                new Thread(new Producer(i,countDownLatchProducer)).start();
            }
    //        Thread.sleep(20000);
            countDownLatchProducer.await();//当所有的生产者子线程的任务都完成之后,就会恢复继续向下执行。
            System.out.println("消费前大小:"+Queue.getList().size());
            for (int i = 0; i < 3; i++) {
                new Thread(new Consumer(countDownLatchConsumer)).start();
            }
    //        Thread.sleep(20000);
            countDownLatchConsumer.await();//当所有的消费者子线程的任务都完成之后,就会恢复继续向下执行。
            System.out.println("消费后大小:"+Queue.getList().size());
        }
    
        class Producer implements Runnable {
    
            private int i;
            private CountDownLatch countDownLatch;
    
            public Producer(int i,CountDownLatch countDownLatch) {
                this.i = i;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
                        Queue.put(new Bread(i));
                        countDownLatch.countDown();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        static class Consumer implements Runnable {
    
            private CountDownLatch countDownLatch;
    
            public Consumer(CountDownLatch countDownLatch) {
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
                        Bread bread = Queue.get();
                        countDownLatch.countDown();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    public class Queue {
    
        private final static List<Bread> list = new ArrayList<>(15);
    
        public static List getList() {
            return list;
        }
    
        static void put(Bread bread) throws InterruptedException {//进队列
            synchronized (list) {
                while (list.size() >= 15) {//限制容器大小最大值为15
                    list.wait();
                }
                list.add(bread);
                list.notifyAll();
            }
        }
    
        static Bread get() throws InterruptedException {//出队列
            Bread bread = null;
            synchronized (list) {
                if (list.size() == 0)
                    list.wait();
                bread = list.remove(list.size() - 1);
                list.notifyAll();
            }
            return bread;
        }
    }
    
    package com.hao.hellolearn.jdk.producerconsumer;
    
    public class Bread {
    
        private int value;
    
        public Bread(int value) {
            this.value = value;
        }
    
        public int getValue() {
            return value;
        }
    
        public void setValue(int value) {
            this.value = value;
        }
    }
    

    举例2:

    这个例子展示的是:当容器的容量不足以装下生产者生产的任务的时候,生产者的线程就会阻塞在put方法上,直到消费者开始消费容器中的任务,生产者线程才会恢复执行,可以通过sleep方法来协助测试。

    public class WaitNotifyImpl {
    
        @Test
        public void test01() throws InterruptedException {
            for (int i = 0; i < 3; i++) {//定义3个线程
                new Thread(new Producer(i)).start();
            }
            Thread.sleep(20000);
            System.out.println("消费前大小:"+Queue.getList().size());
            for (int i = 0; i < 3; i++) {
                new Thread(new Consumer()).start();
            }
            Thread.sleep(20000);
            System.out.println("消费后大小:"+Queue.getList().size());
        }
    
        class Producer implements Runnable {
    
            private int i;
    
            public Producer(int i) {
                this.i = i;
            }
    
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
                        Queue.put(new Bread(i));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        static class Consumer implements Runnable {
    
            public Consumer() {
            }
    
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
                        Bread bread = Queue.get();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    当第一个sleep执行的时候,可以在cmd里面分别执行jps -l 查看运行中的进程的pid,然后用jstack -l pid 来打印出这个进程的堆栈信息,可以看到线程阻塞在object monitor上面。1处的是生产者线程的状态是WAITING是有object.wait 引起的,2处的是TIMED_WAITING是由sleep引起的。


    生产者-消费者-jstack.png

    总结:

    生产者-消费者 生产出来的产品放在一个容器里面,消费者从容器里面取出来;如果生产速率慢,消费速度快,那么当容器为空的时候,消费者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面被放进一个产品,然后争抢着。如果消费速率慢,生产速度快,那么当容器满了的时候生产者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面的某一个产品被消费掉。它的本质其实就是线程之间的协作。
    觉察即自由。

    相关文章

      网友评论

          本文标题:生产者-消费者(wait-notifyAll)

          本文链接:https://www.haomeiwen.com/subject/ddlvnctx.html