美文网首页
常见的生产者消费者模式的实现

常见的生产者消费者模式的实现

作者: 科威舟VS求利 | 来源:发表于2020-12-26 14:14 被阅读0次

    生产者、消费者模式被广泛的运用在解耦、消息队列场景中。使用生产者和消费者模式,通常在他们之间增加一个阻塞队列作为媒介,有了媒介之后,相当于有了一个缓冲,平衡了两者之间的能力。

    生产者消费者模式设计

    如图所示,生产者线程会向阻塞队里面加数据,满了则阻塞,而消费者线程会向阻塞队里里面取出数据。阻塞队列的作用就是平衡图中3和4的过程。


    生产者消费者模式(图片来源于拉勾).png

    生产者、消费者模式的实现

    利用BlockingQueue实现

    代码如下所示,表面上看起来简单,实际上BlockingQueue给我们做了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

    public class BlockingQueueDemo {
    
        public static void main(String[] args) {
            //创建ArrayBlockingQueue类型的BlockingQueue
            BlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<>(10);
    
            Runnable producer = () -> {
                while (true) {
                    try {
                        //生产者往队列中存放数据
                        log.info("execute arrayQueue put......");
                        arrayQueue.put(new Object());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            //启动两个生产者线程
            new Thread(producer).start();
            new Thread(producer).start();
    
            Runnable customer = () -> {
                while (true) {
                    try {
                        log.info("execute arrayQueue take......");
                        //消费者从队列中取出数据
                        arrayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            //启动两个消费者线程
            new Thread(customer).start();
            new Thread(customer).start();
        }
    }
    

    利用Condition实现

    @Slf4j
    public class BlockingQueueForCondition {
    
        private Queue queue;
        private int max = 16;
        private ReentrantLock lock = new ReentrantLock();
        //在lock锁的基础上创建两个条件,notEmptyCondition:队列没有空的条件,notFullCondition队列没有满的条件
        private Condition notEmptyCondition = lock.newCondition();
        private Condition notFullCondition = lock.newCondition();
    
        public BlockingQueueForCondition(int size) {
            this.max = size;
            queue = new LinkedList();
        }
    
        public void put(Object object) throws InterruptedException {
            //多线程场景需要一定的同步措施来保证线程安全,因此需要加锁操作
            lock.lock();
            try {
                //检测队列是否已经满了
                while (queue.size() == max) {
                    //此时队列已经满了,阻塞生产者线程并且释放lock锁
                    notFullCondition.await();
                }
                //如果没有满则往队列里面存放数据,并且调用notEmptyCondition.signalAll()通知正在等待的消费者,
                //并且唤醒等待的消费者
                queue.add(object);
                notEmptyCondition.signalAll();
            } finally {
                //最后释放锁,否则可能会产生无法释放锁的情况
                lock.unlock();
            }
        }
    
        public Object take() throws InterruptedException {
    
            try {
                while (queue.isEmpty()) {
                    notEmptyCondition.await();
                }
                Object item = queue.remove();
                notFullCondition.signalAll();
                return item;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueueForCondition block = new BlockingQueueForCondition(20);
            for (int i = 0; i < 100; i++) {
                block.put(new Object());
            }
        }
    }
    

    利用Wait、Notify实现

    BlockingQueueForWaitNotify: 实现阻塞队列内部的put、take细节,以及线程的等待、通知和唤醒

    @Slf4j
    public class BlockingQueueForWaitNotify {
    
        private int maxSize;
        private LinkedList<Integer> storage;
        private final Random random = new Random();
    
        public BlockingQueueForWaitNotify(int size) {
            this.maxSize = size;
            storage = new LinkedList<>();
        }
    
        public synchronized void put(int item) throws InterruptedException {
            while (storage.size() == maxSize) {
                wait();
            }
            storage.add(item);
            notifyAll();
        }
    
        public synchronized int take() throws InterruptedException{
            while (storage.isEmpty()){
                wait();
            }
            int item = storage.remove();
            notifyAll();
            return item;
        }
    }
    

    BlockingQueueTest: 定义生产者、消费者线程测试阻塞队列

    @Slf4j
    public class BlockingQueueTest {
    
        public static void main(String[] args) {
            BlockingQueueForWaitNotify blockingQueue = new BlockingQueueForWaitNotify(20);
            Producer producer = new Producer(blockingQueue);
            Consumer consumer = new Consumer(blockingQueue);
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    }
    
    @Slf4j
    class Producer implements Runnable {
    
        private BlockingQueueForWaitNotify blockingQueue;
    
        public Producer(BlockingQueueForWaitNotify blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        //    @SneakyThrows
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    log.info("produce begin to put");
                    blockingQueue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    @Slf4j
    class Consumer implements Runnable {
    
        private BlockingQueueForWaitNotify blockingQueue;
    
        public Consumer(BlockingQueueForWaitNotify blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    int item = blockingQueue.take();
                    log.info("consumer take:{}", item);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    总结

    本文介绍了3种实现生产者、消费者模式的方法,分别是基于ArrayBlockingQueue、Condition、Wait和Notify来实现。利用jdk自带的ArrayBlockingQueue比较简单,它内部已经帮你实现了队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等逻辑,而利用Condition、Wait和Notify实现相当于我们写了BlockingQueue的一些内部细节,供生产者消费者使用。

    参考:
    1.拉勾教育《java并发编程78讲》
    2.https://github.com/nuptkwz/high-concurrency/tree/master/src/main/java/com/practice/concurrency/highconcurrency/producerandcustomer

    相关文章

      网友评论

          本文标题:常见的生产者消费者模式的实现

          本文链接:https://www.haomeiwen.com/subject/lnnunktx.html