美文网首页
【线程通信】生产者-消费者模式

【线程通信】生产者-消费者模式

作者: Djbfifjd | 来源:发表于2021-02-18 11:56 被阅读0次

    一、简述

    1️⃣生产者消费者模式并不是 GOF 提出的 23 种设计模式之一,23 种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是编程过程中最常用的一种设计模式。

    一个常见的场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

    2️⃣举个寄信的例子,大致过程如下:

    1. 把信写好——相当于生产者制造数据。
    2. 把信放入邮筒——相当于生产者把数据放入缓冲区。
    3. 邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区。
    4. 邮递员把信拿去邮局做相应的处理——相当于消费者处理数据。

    3️⃣说明

    1. 生产消费者模式可以有效的对数据解耦,优化系统结构。
    2. 降低生产者和消费者线程相互之间的依赖与性能要求。
    3. 一般使用 BlockingQueue 作为数据缓冲队列,是通过锁和阻塞来实现数据之间的同步。如果对缓冲队列有性能要求,则可以使用基于 CAS 无锁设计的 ConcurrentLinkedQueue

    二、生产者-消费者模式是一个经典的多线程设计模式

    【生产者-消费者模式】为多线程间的协作提供了良好的解决方案。在该模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

    Java 中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

    1. Object 的 wait()/notify()。
    2. Lock 和 Condition 的 await()/signal()。
    3. BlockingQueue 阻塞队列方法。
    4. PipedInputStream/PipedOutputStream

    三、wait()/notify()

    wait()/nofity() 是基类 Object 的两个方法,也就意味着所有 Java 类都有这两个方法,这样就可以为任何对象实现同步机制。

    1. wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
    2. notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
    public class ProducerConsumer {
        private static final int CAPACITY = 5;
    
        public static void main(String args[]) {
            Queue<Integer> queue = new LinkedList<Integer>();
    
            Thread producer1 = new Producer("P-1", queue, CAPACITY);
            Thread producer2 = new Producer("P-2", queue, CAPACITY);
            Thread consumer1 = new Consumer("C1", queue, CAPACITY);
            Thread consumer2 = new Consumer("C2", queue, CAPACITY);
            Thread consumer3 = new Consumer("C3", queue, CAPACITY);
    
            producer1.start();
            producer2.start();
            consumer1.start();
            consumer2.start();
            consumer3.start();
        }
    }
    

    生产者

    public static class Producer extends Thread {
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;
    
        public Producer(String name, Queue<Integer> queue, int maxSize) {
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (queue) {
                    while (queue.size() == maxSize) {
                        try {
                            System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    System.out.println("[" + name + "] Producing value : +" + i);
                    queue.offer(i++);
                    queue.notifyAll();
    
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    消费者

    public static class Consumer extends Thread {
        private Queue<Integer> queue;
        String name;
        int maxSize;
    
        public Consumer(String name, Queue<Integer> queue, int maxSize) {
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    int x = queue.poll();
                    System.out.println("[" + name + "] Consuming value : " + x);
                    queue.notifyAll();
    
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    判断 Queue 大小为 0 或者大于等于 queueSize 时须使用while(condition){},不能使用if(condition){}。其中while(condition),它又被叫做“自旋锁”。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒),这个线程会重新去检查 condition 条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。

    四、使用 Lock 和 Condition 的 await()/signal()

    JDK5.0 之后,Java 提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition 接口的 await()/signal() 就是其中用来做同步的两种方法,它们的功能基本上和 Object 的 wait()/nofity() 相同,完全可以取代它们,但是它们和新引入的锁定机制 Lock 直接挂钩,具有更大的灵活性。通过在 Lock 对象上调用newCondition(),将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:

    /**
     * 生产者消费者模式:使用Lock和Condition实现
     * {@link java.util.concurrent.locks.Lock}
     * {@link java.util.concurrent.locks.Condition}
     */
    public class ProducerConsumerByLock {
        private static final int CAPACITY = 5;
        private static final Lock lock = new ReentrantLock();
        private static final Condition fullCondition = lock.newCondition();     //队列满的条件
        private static final Condition emptyCondition = lock.newCondition();    //队列空的条件
    
        public static void main(String args[]) {
            Queue<Integer> queue = new LinkedList<Integer>();
    
            Thread producer1 = new Producer("P-1", queue, CAPACITY);
            Thread producer2 = new Producer("P-2", queue, CAPACITY);
            Thread consumer1 = new Consumer("C1", queue, CAPACITY);
            Thread consumer2 = new Consumer("C2", queue, CAPACITY);
            Thread consumer3 = new Consumer("C3", queue, CAPACITY);
    
            producer1.start();
            producer2.start();
            consumer1.start();
            consumer2.start();
            consumer3.start();
        }
    }
    

    生产者

    public static class Producer extends Thread {
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;
    
        public Producer(String name, Queue<Integer> queue, int maxSize) {
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            while (true) {
    
                //获得锁
                lock.lock();
                while (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                        //条件不满足,生产阻塞
                        fullCondition.await();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("[" + name + "] Producing value : +" + i);
                queue.offer(i++);
    
                //唤醒其他所有生产者、消费者
                fullCondition.signalAll();
                emptyCondition.signalAll();
    
                //释放锁
                lock.unlock();
    
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    

    消费者

    public static class Consumer extends Thread {
        private Queue<Integer> queue;
        String name;
        int maxSize;
    
        public Consumer(String name, Queue<Integer> queue, int maxSize) {
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            while (true) {
                //获得锁
                lock.lock();
    
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                        //条件不满足,消费阻塞
                        emptyCondition.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                int x = queue.poll();
                System.out.println("[" + name + "] Consuming value : " + x);
    
                //唤醒其他所有生产者、消费者
                fullCondition.signalAll();
                emptyCondition.signalAll();
    
                //释放锁
                lock.unlock();
    
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    五、使用 BlockingQueue 阻塞队列方法

    JDK1.5 以后新增的java.util.concurrent包新增了 BlockingQueue 接口。并提供了如下几种阻塞队列实现:

    1. java.util.concurrent.ArrayBlockingQueue
    2. java.util.concurrent.LinkedBlockingQueue
    3. java.util.concurrent.SynchronousQueue
    4. java.util.concurrent.PriorityBlockingQueue

    实现生产者-消费者模型使用 ArrayBlockingQueue 或者 LinkedBlockingQueue 即可。

    这里使用 LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是 await()/signal()。它可以在生成对象时指定容量大小。它用于阻塞操作的是 put()/take()。

    1. put():类似于上面的生产者线程,容量达到最大时,自动阻塞。
    2. take():类似于上面的消费者线程,容量为 0 时,自动阻塞。

    LinkedBlockingQueue 类的 put() 源码:

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();
    
    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();
    
    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();
    
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    

    整体逻辑如下:

    /**
     * 生产者消费者模式:使用{@link java.util.concurrent.BlockingQueue}实现
     */
    public class ProducerConsumerByBQ{
        private static final int CAPACITY = 5;
    
        public static void main(String args[]){
            LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);
    
            Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
            Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
            Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
            Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
            Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);
    
            producer1.start();
            producer2.start();
            consumer1.start();
            consumer2.start();
            consumer3.start();
        }
    
        /**
         * 生产者
         */
        public static class Producer extends Thread{
            private LinkedBlockingDeque<Integer> blockingQueue;
            String name;
            int maxSize;
            int i = 0;
    
            public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
                super(name);
                this.name = name;
                this.blockingQueue = queue;
                this.maxSize = maxSize;
            }
    
            @Override
            public void run(){
                while(true){
                    try {
                        blockingQueue.put(i);
                        System.out.println("[" + name + "] Producing value : +" + i);
                        i++;
    
                        //暂停最多1秒
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        /**
         * 消费者
         */
        public static class Consumer extends Thread{
            private LinkedBlockingDeque<Integer> blockingQueue;
            String name;
            int maxSize;
    
            public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
                super(name);
                this.name = name;
                this.blockingQueue = queue;
                this.maxSize = maxSize;
            }
    
            @Override
            public void run(){
                while(true){
                    try {
                        int x = blockingQueue.take();
                        System.out.println("[" + name + "] Consuming : " + x);
    
                        //暂停最多1秒
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:【线程通信】生产者-消费者模式

          本文链接:https://www.haomeiwen.com/subject/wwugxltx.html