美文网首页Java
Java生产者和消费者实现

Java生产者和消费者实现

作者: zzzhouzhong | 来源:发表于2019-03-17 14:28 被阅读0次

    0. 前言

    生产者消费者是考察多线程的常见问题。最近尝试手写生产者和消费者时,发现这个问题并不止是考察多线程,还可以考察泛型、设计模式等。这里总结下如何手写生产者消费者。

    1. 预备知识

    • Java多线程基本知识
    • 泛型
    • 简单的设计模式

    2. 设计与整体架构

    生产者消费者模式,需要生产者生产产品,消费者消耗产品。因此围绕着产品,我们需要:

    • 产品
    • 容器:盛放产品,既生产者生产后放入容器中,消费者到容器中取出产品。本文使用数据结构中队列(Queue)——尾部插入,头部取出
    • 生产者:生产产品
    • 消费者:消费产品
    • 线程池:生产者和消费者跑在不同线程中,用过同步机制实现生产和消费。

    分析完,上代码。

    ↓面向抽象(接口)的编程,无论如何逼格先起来~~

        //产品接口
        interface Product<T> {
            T get();
        }
    
        //生产者接口
        interface Producer<T> {
            Product<T> produce();
        }
    
        //消费者接口
        interface Consumer<T> {
            void consume(Product<T> product);
        }
    

    然后是各个实现类,以产生long为例:↓

    //产品类
        static class ProductImpl<T> implements Product<T> {
            private T data;
    
            ProductImpl(T data) {
                this.data = data;
            }
    
            @Override
            public T get() {
                return data;
            }
        }
    
        //生产者类
        static class LongProducer implements Producer<Long> {
            @Override
            public Product<Long> produce() {
                costTime(3000);
                long data = System.currentTimeMillis();
                System.out.println("produce:" + data);
                return new ProductImpl<>(data);
            }
        }
    
        //消费者类
        static class LongConsumer implements Consumer<Long> {
            @Override
            public void consume(Product<Long> product) {
                costTime(2000);
                System.out.println("consume:" + product.get());
            }
        }
    
        //模拟生产或消费时花费的时间
        private static void costTime(int sleepTime) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    

    ↓实际上生产者和消费者需要跑在Thread中,因此我们还需要用Runnable包裹一下,并循环生产/消费。利用代理模式,我们达到一个类只做一件事情的目的。其中produceOnce()consumeOnce()待实现

        //利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
        static class RunnableProducer<T> implements Producer<T>, Runnable {
            Producer<T> delegate;
            final Queue<Product<T>> queue;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
                this.delegate = producer;
                this.queue = queue;
            }
    
            @Override
            public Product<T> produce() {
                return delegate.produce();
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    produceOnce();
                }
            }
    
            private void produceOnce() {
                // TODO: 19/3/18
            }
        }
    
        //利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
        static class RunnableConsumer<T> implements Consumer<T>, Runnable {
            Consumer<T> delegate;
            final Queue<Product<T>> queue;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
                this.delegate = consumer;
                this.queue = queue;
            }
    
            @Override
            public void consume(Product<T> product) {
                delegate.consume(product);
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    consumeOnce();
                }
            }
    
            private void consumeOnce() {
                // TODO: 19/3/18  
            }
        }
    

    ↓最后写下测试代码,使用LinkedList作为Queue,跑在ThreadPool中。至此整体架构就完成了。

        public static void main(String[] args) {
            Consumer<Long> consumer = new LongConsumer();
            Producer<Long> producer = new LongProducer();
            testSimpleProducerConsumer(producer, consumer);
        }
    
        private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
            Queue<Product<T>> queue = new LinkedList<>();
            RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
            RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(realConsumer);
            executorService.execute(realProducer);
            costTime(30 * 1000);
            executorService.shutdown();
            realConsumer.cancel();
            realProducer.cancel();
        }
    

    3. 实现


    3.1 PlanA: Object的 wait()notify()

    常用的同步/互斥就是Objectwait()notify()系列方法了。我们先用它来实现生产和消费的同步。

            private void produceOnce() {
                synchronized (queue) {
                    try {
                        //先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
                        while (queue.peek() != null) {
                            queue.wait();
                        }
                        //唤醒后执行生产
                        queue.add(produce());
                        //生产后发送notify()唤醒消费者
                        queue.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            private void consumeOnce() {
                synchronized (queue) {
                    try {
                        //先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
                        while (queue.peek() == null) {
                            queue.wait();
                        }
                        //唤醒后执行消费
                        consume(queue.poll());
                        //消费完后发送notify()唤醒生产者
                        queue.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    

    注释都写得很明白啦。跑一波看看结果。

    image.png
    总结:使用Objectwait()notify()需要自己实现生产者和消费者的配合,要小心各种逻辑的处理。下面使用可阻塞的BlockingQueue代替Queue,实现起来更简单,也更不容易出错。

    3.2 PlanB:使用BlockingQueue

            //使用BlockingQueue代替Queue,实现可阻塞的队列
            BlockingQueue<Product<T>> queue;
            private void produceOnce() {
                try {
                    Product<T> produce = produce();
                    queue.put(produce);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private void ConsumeOnce() {
                try {
                    Product<T> product = queue.take();
                    consume(product);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
            //使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
            BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
            BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
            BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(realProducer);
            executorService.execute(realConsumer);
            costTime(30 * 1000);
            executorService.shutdown();
            realConsumer.cancel();
            realProducer.cancel();
        }
    
        public static void main(String[] args) {
            Consumer<Long> consumer = new LongConsumer();
            Producer<Long> producer = new LongProducer();
    //        testSimpleProducerConsumer(producer, consumer);
            testBetterProducerConsumer(producer, consumer);
        }
    

    ↓跑一把


    image.png

    4. 总结

    手写生产者消费者看似考察多线程知识,但这既是挑战也是机遇,如果用心思考,可以展示你更多的能力。本文不止是多线程知识,还有设计模式(面向接口的抽象思想,接口隔离原则,单一职责原则,代理模式,装饰器模式等)、泛型的使用等。另外,我们看到RunnableConsumerBetterRunnableConsumer中有大量重复代码,因此可以再抽象一层出来。而且他们都依赖了Queue,并且是构造方法中传入的依赖。这其实并不好,可以使用依赖倒置原则(依赖注入)进一步优化。

    5. 其他

    参考:
    https://www.jianshu.com/p/e29632593057
    这边文章还提到了了错过notify信号wait条件变化唤醒同类导致“假死”状态等。

    6. 代码

    package com.zz.multithreaddemo;
    
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.SynchronousQueue;
    
    public class ProducerConsumerDemo {
    
        public static void main(String[] args) {
            Consumer<Long> consumer = new LongConsumer();
            Producer<Long> producer = new LongProducer();
            testSimpleProducerConsumer(producer, consumer);
    //        testBetterProducerConsumer(producer, consumer);
        }
    
        private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
            Queue<Product<T>> queue = new LinkedList<>();
            RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
            RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(realConsumer);
            executorService.execute(realProducer);
            costTime(10 * 1000);
            executorService.shutdown();
            realConsumer.cancel();
            realProducer.cancel();
        }
    
    
        private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
            //使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
            BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
            BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
            BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(realProducer);
            executorService.execute(realConsumer);
            costTime(10 * 1000);
            executorService.shutdown();
            realConsumer.cancel();
            realProducer.cancel();
        }
    
    
        //产品接口
        interface Product<T> {
            T get();
        }
    
        //生产者接口
        interface Producer<T> {
            Product<T> produce();
        }
    
        //消费者接口
        interface Consumer<T> {
            void consume(Product<T> product);
        }
    
        //产品类
        static class ProductImpl<T> implements Product<T> {
            private T data;
    
            ProductImpl(T data) {
                this.data = data;
            }
    
            @Override
            public T get() {
                return data;
            }
        }
    
        //生产者类
        static class LongProducer implements Producer<Long> {
            @Override
            public Product<Long> produce() {
                costTime(3000);
                long data = System.currentTimeMillis();
                System.out.println("produce:" + data);
                return new ProductImpl<>(data);
            }
        }
    
        //消费者类
        static class LongConsumer implements Consumer<Long> {
            @Override
            public void consume(Product<Long> product) {
                costTime(2000);
                System.out.println("consume:" + product.get());
            }
        }
    
        //模拟生产或消费时花费的时间
        private static void costTime(int sleepTime) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
        //利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
        static class RunnableProducer<T> implements Producer<T>, Runnable {
            Producer<T> delegate;
            final Queue<Product<T>> queue;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
                this.delegate = producer;
                this.queue = queue;
            }
    
            @Override
            public Product<T> produce() {
                return delegate.produce();
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    produceOnce();
                }
            }
    
            private void produceOnce() {
                synchronized (queue) {
                    try {
                        //先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
                        while (queue.peek() != null) {
                            queue.wait();
                        }
                        //唤醒后执行生产
                        queue.add(produce());
                        //生产后发送notify()唤醒消费者
                        queue.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
        static class RunnableConsumer<T> implements Consumer<T>, Runnable {
            Consumer<T> delegate;
            final Queue<Product<T>> queue;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
                this.delegate = consumer;
                this.queue = queue;
            }
    
            @Override
            public void consume(Product<T> product) {
                delegate.consume(product);
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    consumeOnce();
                }
            }
    
            private void consumeOnce() {
                synchronized (queue) {
                    try {
                        //先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
                        while (queue.peek() == null) {
                            queue.wait();
                        }
                        //唤醒后执行消费
                        consume(queue.poll());
                        //消费完后发送notify()唤醒生产者
                        queue.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class BetterRunnableProducer<T> implements Producer<T>, Runnable {
            //使用BlockingQueue代替Queue,实现可阻塞的队列
            BlockingQueue<Product<T>> queue;
            Producer<T> delegate;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            BetterRunnableProducer(Producer<T> producer, BlockingQueue<Product<T>> queue) {
                this.delegate = producer;
                this.queue = queue;
            }
    
            @Override
            public Product<T> produce() {
                return delegate.produce();
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    produceOnce();
                }
            }
    
            private void produceOnce() {
                try {
                    Product<T> produce = produce();
                    queue.put(produce);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class BetterRunnableConsumer<T> implements Consumer<T>, Runnable {
            BlockingQueue<Product<T>> queue;
            Consumer<T> delegate;
            volatile boolean canceled = false;
    
            public void cancel() {
                canceled = true;
            }
    
            BetterRunnableConsumer(Consumer<T> consumer, BlockingQueue<Product<T>> queue) {
                this.delegate = consumer;
                this.queue = queue;
            }
    
            @Override
            public void consume(Product<T> product) {
                delegate.consume(product);
            }
    
            @Override
            public void run() {
                while (!canceled) {
                    ConsumeOnce();
                }
            }
    
            private void ConsumeOnce() {
                try {
                    Product<T> product = queue.take();
                    consume(product);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Java生产者和消费者实现

        本文链接:https://www.haomeiwen.com/subject/qelumqtx.html