美文网首页
并发编程专题-5:生产者和消费者模式

并发编程专题-5:生产者和消费者模式

作者: 开发者如是说 | 来源:发表于2019-04-08 21:26 被阅读0次

    所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。

    在实现生产者消费者问题时,可以采用三种方式:

    1. 使用 Object 的 wait()/notify() 的消息通知机制;
    2. 使用 Lock 的 Condition 的 await()/signal() 的消息通知机制;
    3. 使用 BlockingQueue 实现。

    1、基于 Object 的 wait()/notifyAll() 实现方式

    1.1 理论基础

    关于 wait()notify()notifyAll() 的理论基础,可以参考笔者的文章,

    《并发编程专题 2:使用多线程编程》 的 《线程控制 wait()、notify() 和 notifyAll()》 一节。

    1.2 基于 Object 的 wait()/notifyAll() 生产者消费者模式

        // 生产者
        private static class Consumer implements Runnable {
            private List<Object> products;
    
            // 传入的对象是产品,也就是说,生产者和消费者通过产品建立联系
            public Consumer(List<Object> products) {
                this.products = products;
            }
    
            public void run() {
                while (true) { // 使用循环来不断消费
                    synchronized (products) { // 对产品加锁 products
                        while (products.isEmpty()) { // 1. 没有产品了
                            try { // 调用 wait() 的时候使用 tru...catch 防止线程终端
                                products.wait(); // 已经没有产品可以消费了
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        products.remove(0); // 消费一个
                        products.notifyAll(); // 通知其他线程
                        System.out.println("Eat one. Left : " + products.size());
                    }
                }
            }
        }
    
        // 消费者
        private static class Producer implements Runnable {
            private final int max; // 产品的上限
            private List<Object> products;
    
            // 参数是产品的上限和产品列表(理解成仓库和仓库的最大容量亦可)
            public Producer(int max, List<Object> products) {
                this.max = max;
                this.products = products;
            }
    
            public void run() {
                while (true) {
                    synchronized (products) {
                        while (products.size() > max) { // 2. 大于上限就停止生产
                            try {
                                products.wait(); // 暂停生成
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        products.add(new Object()); // 生成一个
                        products.notifyAll(); // 唤醒
                        System.out.println("Made one. Total : " + products.size());
                    }
                }
            }
        }
    
        // 模拟 & 验证
        public static void main(String...args) {
            final int MAX_PRODUCTS = 20;
            List<Object> products = new LinkedList<>();
            Executor executor = Executors.newCachedThreadPool(); // 使用线程池
            executor.execute(new Producer(MAX_PRODUCTS, products));
            executor.execute(new Consumer(products));
            executor.execute(new Consumer(products));
        }
    

    上面的生产者和消费者模式示例代码的注释已经非常详尽了,这里有几个问题需要说明一下。

    首先是上述注释中的 1 和 2,这里我们使用的是 while 循环而不是 if 语句。这是因为假如存在多个消费者并且都因为产品不足而发生阻塞,当消费者调用了 notifyAll() 将它们唤醒的时候,其中的一个被唤醒并且消费了产品,释放了锁,而此时生产者还没有生成出新的产品,另一个生产者获得了锁,并进行消费,那么此时会因为没有可以消费的产品而抛出异常。所以,在消费者和生产者中,我们都使用 while 循环而不是 if. 这样,按照上述情形,另一个生产者会继续判断是否有可以消费的产品,如果没有的话,将会继续 wait.

    另外就是使用 notifyAll() 而不是 notify() 唤醒。假如我们存在多个消费者在等待,此时一个消费者使用 notify() 唤醒的依然是消费者,那么线程将进入假死状态。

    2、Lock 的 Condition 的 await()/signal() 的实现方式

    使用 Lock 的实现方式与使用 Object 的 wait()notifyAll() 的实现方式原理是一致的。

        // 定义锁、仓库已满的条件和仓库为空的条件
        private static ReentrantLock lock = new ReentrantLock();
        private static final Condition full = lock.newCondition();
        private static final Condition empty = lock.newCondition();
    
        // 消费者
        private static class Consumer implements Runnable {
            private List<Object> products;
    
            public Consumer(List<Object> products) {
                this.products = products;
            }
    
            public void run() {
                while (true) {
                    lock.lock(); // 加锁
                    try {
                        while (products.isEmpty()) { // 没有可消费的产品
                            try {
                                empty.await(); // 没有可用的产品了,等待
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        products.remove(0); // 消费一个
                        full.signalAll(); // 唤醒所有生产者
                        empty.signalAll(); // 唤醒所有消费者
                        System.out.println("Eat one. Left : " + products.size());
                    } finally {
                        lock.unlock(); // 释放锁
                    }
                }
            }
        }
    
        // 生产者
        private static class Producer implements Runnable {
            private final int max;
            private List<Object> products;
    
            public Producer(int max, List<Object> products) {
                this.max = max;
                this.products = products;
            }
    
            public void run() {
                while (true) {
                    lock.lock(); // 加锁
                    try {
                        while (products.size() > max) { // 已经达到了最大的产量
                            try {
                                full.await(); // 已达最大产量,等待
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        products.add(new Object()); // 生产一个
                        full.signalAll(); 
                        empty.signalAll();
                        System.out.println("Made one. Total : " + products.size());
                    } finally {
                        lock.unlock(); // 释放锁
                    }
                }
            }
        }
    
        public static void main(String...args) {
            final int MAX_PRODUCTS = 20;
            List<Object> products = new LinkedList<>();
            Executor executor = Executors.newCachedThreadPool();
            executor.execute(new Producer(MAX_PRODUCTS, products));
            executor.execute(new Consumer(products));
            executor.execute(new Consumer(products));
        }
    

    与第一种情形类似,在 wait()/notifyAll() 的实现中,加锁是对 products 进行的,也就是说,生产者和消费者通过产品(或者 “仓库”)联系到了一起。而上面的情形中,生产者和消费者通过 lock, fullempty 三个条件关联。

    3、基于 BlockingQueue 实现

    从上面的两种实现方式中,我们也可以看出,实现生产和消费协调的关键,就是当生产达到最大数量或者可以消费的达到了最小值的时候,使生产者和消费者线程进行阻塞。既然是阻塞,那我们为什么不能直接通过阻塞队列来实现呢?

        private static class Consumer implements Runnable {
    
            private BlockingQueue<Object> products;
    
            public Consumer(BlockingQueue<Object> products) {
                this.products = products;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        products.take(); // 取不到数据的时候自动阻塞
                        System.out.println("Consumed one, Total " + products.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        // 生产者
        private static class Producer implements Runnable {
    
            private BlockingQueue<Object> products;
    
            public Producer(BlockingQueue<Object> products) {
                this.products = products;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        products.put(new Object()); // 当达到了最大数量的时候会阻塞
                        System.out.println("Produced one, Total " + products.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        public static void main(String...args) {
            final int MAX_PRODUCTS = 20;
            BlockingQueue<Object> products = new LinkedBlockingDeque<>(MAX_PRODUCTS);
            Executor executor = Executors.newCachedThreadPool();
            executor.execute(new Producer(products));
            executor.execute(new Consumer(products));
            executor.execute(new Consumer(products));
        }
    

    从上面我们可以看出,这种实现方式的思想非常接近与我们第一种实现方式,即都是通过产品来在生产者和消费者之间建立联系。使用阻塞队列的实现方式优势明显,没有线程控制,代码更加简洁。

    相关文章

      网友评论

          本文标题:并发编程专题-5:生产者和消费者模式

          本文链接:https://www.haomeiwen.com/subject/xzrmiqtx.html