美文网首页
并发编程专题-5:生产者和消费者模式

并发编程专题-5:生产者和消费者模式

作者: 开发者如是说 | 来源:发表于2019-04-08 21:26 被阅读0次

所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。

在实现生产者消费者问题时,可以采用三种方式:

  1. 使用 Object 的 wait()/notify() 的消息通知机制;
  2. 使用 Lock 的 Condition 的 await()/signal() 的消息通知机制;
  3. 使用 BlockingQueue 实现。

1、基于 Object 的 wait()/notifyAll() 实现方式

1.1 理论基础

关于 wait()notify()notifyAll() 的理论基础,可以参考笔者的文章,

《并发编程专题 2:使用多线程编程》 的 《线程控制 wait()、notify() 和 notifyAll()》 一节。

1.2 基于 Object 的 wait()/notifyAll() 生产者消费者模式

    // 生产者
    private static class Consumer implements Runnable {
        private List<Object> products;

        // 传入的对象是产品,也就是说,生产者和消费者通过产品建立联系
        public Consumer(List<Object> products) {
            this.products = products;
        }

        public void run() {
            while (true) { // 使用循环来不断消费
                synchronized (products) { // 对产品加锁 products
                    while (products.isEmpty()) { // 1. 没有产品了
                        try { // 调用 wait() 的时候使用 tru...catch 防止线程终端
                            products.wait(); // 已经没有产品可以消费了
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    products.remove(0); // 消费一个
                    products.notifyAll(); // 通知其他线程
                    System.out.println("Eat one. Left : " + products.size());
                }
            }
        }
    }

    // 消费者
    private static class Producer implements Runnable {
        private final int max; // 产品的上限
        private List<Object> products;

        // 参数是产品的上限和产品列表(理解成仓库和仓库的最大容量亦可)
        public Producer(int max, List<Object> products) {
            this.max = max;
            this.products = products;
        }

        public void run() {
            while (true) {
                synchronized (products) {
                    while (products.size() > max) { // 2. 大于上限就停止生产
                        try {
                            products.wait(); // 暂停生成
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    products.add(new Object()); // 生成一个
                    products.notifyAll(); // 唤醒
                    System.out.println("Made one. Total : " + products.size());
                }
            }
        }
    }

    // 模拟 & 验证
    public static void main(String...args) {
        final int MAX_PRODUCTS = 20;
        List<Object> products = new LinkedList<>();
        Executor executor = Executors.newCachedThreadPool(); // 使用线程池
        executor.execute(new Producer(MAX_PRODUCTS, products));
        executor.execute(new Consumer(products));
        executor.execute(new Consumer(products));
    }

上面的生产者和消费者模式示例代码的注释已经非常详尽了,这里有几个问题需要说明一下。

首先是上述注释中的 1 和 2,这里我们使用的是 while 循环而不是 if 语句。这是因为假如存在多个消费者并且都因为产品不足而发生阻塞,当消费者调用了 notifyAll() 将它们唤醒的时候,其中的一个被唤醒并且消费了产品,释放了锁,而此时生产者还没有生成出新的产品,另一个生产者获得了锁,并进行消费,那么此时会因为没有可以消费的产品而抛出异常。所以,在消费者和生产者中,我们都使用 while 循环而不是 if. 这样,按照上述情形,另一个生产者会继续判断是否有可以消费的产品,如果没有的话,将会继续 wait.

另外就是使用 notifyAll() 而不是 notify() 唤醒。假如我们存在多个消费者在等待,此时一个消费者使用 notify() 唤醒的依然是消费者,那么线程将进入假死状态。

2、Lock 的 Condition 的 await()/signal() 的实现方式

使用 Lock 的实现方式与使用 Object 的 wait()notifyAll() 的实现方式原理是一致的。

    // 定义锁、仓库已满的条件和仓库为空的条件
    private static ReentrantLock lock = new ReentrantLock();
    private static final Condition full = lock.newCondition();
    private static final Condition empty = lock.newCondition();

    // 消费者
    private static class Consumer implements Runnable {
        private List<Object> products;

        public Consumer(List<Object> products) {
            this.products = products;
        }

        public void run() {
            while (true) {
                lock.lock(); // 加锁
                try {
                    while (products.isEmpty()) { // 没有可消费的产品
                        try {
                            empty.await(); // 没有可用的产品了,等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    products.remove(0); // 消费一个
                    full.signalAll(); // 唤醒所有生产者
                    empty.signalAll(); // 唤醒所有消费者
                    System.out.println("Eat one. Left : " + products.size());
                } finally {
                    lock.unlock(); // 释放锁
                }
            }
        }
    }

    // 生产者
    private static class Producer implements Runnable {
        private final int max;
        private List<Object> products;

        public Producer(int max, List<Object> products) {
            this.max = max;
            this.products = products;
        }

        public void run() {
            while (true) {
                lock.lock(); // 加锁
                try {
                    while (products.size() > max) { // 已经达到了最大的产量
                        try {
                            full.await(); // 已达最大产量,等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    products.add(new Object()); // 生产一个
                    full.signalAll(); 
                    empty.signalAll();
                    System.out.println("Made one. Total : " + products.size());
                } finally {
                    lock.unlock(); // 释放锁
                }
            }
        }
    }

    public static void main(String...args) {
        final int MAX_PRODUCTS = 20;
        List<Object> products = new LinkedList<>();
        Executor executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(MAX_PRODUCTS, products));
        executor.execute(new Consumer(products));
        executor.execute(new Consumer(products));
    }

与第一种情形类似,在 wait()/notifyAll() 的实现中,加锁是对 products 进行的,也就是说,生产者和消费者通过产品(或者 “仓库”)联系到了一起。而上面的情形中,生产者和消费者通过 lock, fullempty 三个条件关联。

3、基于 BlockingQueue 实现

从上面的两种实现方式中,我们也可以看出,实现生产和消费协调的关键,就是当生产达到最大数量或者可以消费的达到了最小值的时候,使生产者和消费者线程进行阻塞。既然是阻塞,那我们为什么不能直接通过阻塞队列来实现呢?

    private static class Consumer implements Runnable {

        private BlockingQueue<Object> products;

        public Consumer(BlockingQueue<Object> products) {
            this.products = products;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    products.take(); // 取不到数据的时候自动阻塞
                    System.out.println("Consumed one, Total " + products.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 生产者
    private static class Producer implements Runnable {

        private BlockingQueue<Object> products;

        public Producer(BlockingQueue<Object> products) {
            this.products = products;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    products.put(new Object()); // 当达到了最大数量的时候会阻塞
                    System.out.println("Produced one, Total " + products.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String...args) {
        final int MAX_PRODUCTS = 20;
        BlockingQueue<Object> products = new LinkedBlockingDeque<>(MAX_PRODUCTS);
        Executor executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(products));
        executor.execute(new Consumer(products));
        executor.execute(new Consumer(products));
    }

从上面我们可以看出,这种实现方式的思想非常接近与我们第一种实现方式,即都是通过产品来在生产者和消费者之间建立联系。使用阻塞队列的实现方式优势明显,没有线程控制,代码更加简洁。

相关文章

  • 阻塞队列实现生产者消费者模式

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和...

  • 多线程之生产者与消费者问题

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和...

  • 生产者消费者模式在GoLang中的应用

    在并发编程中生产者消费者模式模式可以解决大多数并发问题。该模式通过一个阻塞队列来平衡生产者和消费者之间的工作能力,...

  • 第十一章 并发编程实践

    并发编程实战。排查并发编程问题的方法。 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。 11.1 生...

  • Java多线程之生产者和消费者模型

    1 多线程中生产者和消费者 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消...

  • Go Observer

    生产者消费者模式 并发编程中最常见的是生产者消费者模式,该模式主要是通过平衡生产线程和消费线程的工作能力来提高程序...

  • 一篇文章,让你彻底弄懂生产者--消费者问题

    生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓...

  • 面试官:小伙子,说一下实现生产者消费者有几种方式?

    前言 生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深...

  • 并发编程专题-5:生产者和消费者模式

    所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,...

  • Java并发编程实践

    使用并发在提高程序运行速度的同时,也会带来更多的问题和风险。 生产者和消费者模式 在并发种使用生产者和消费者模式能...

网友评论

      本文标题:并发编程专题-5:生产者和消费者模式

      本文链接:https://www.haomeiwen.com/subject/xzrmiqtx.html