Java 生产者消费者实现 —— BlockingQueue

作者: 被称为L的男人 | 来源:发表于2017-09-01 10:51 被阅读942次

    前言

    对着《Java 编程思想》,通过wait - notifyAll实现了生产者消费者模式。今天用BlockingQueue实现一下。

    BlockingQueue

    简单实现

    生产者和消费者,共用一个BlockingQueue。为什么BlockingQueue能够实现生产者-消费者模型呢?对于puttake两个操作,注释如下:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;
    
    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take() throws InterruptedException;
    

    Apple.java,生产和消费的对象。

    public class Apple {
        
        private int id;
        
        public Apple(int id) {
            this.id = id;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        @Override
        public String toString() {
            return "Apple [id=" + id + "]";
        }
    }
    

    生产者:

    public class Producer {
        BlockingQueue<Apple> queue;
        
        public Producer(BlockingQueue<Apple> queue) {
            this.queue = queue;
        }
        
        public boolean put(Apple apple) {
            return queue.offer(apple);
        }
    }
    

    消费者:

    public class Consumer {
        BlockingQueue<Apple> queue;
        
        public Consumer(BlockingQueue<Apple> queue) {
            this.queue = queue;
        }
        
        public Apple take() throws InterruptedException {
            return queue.take();
        }
    }
    

    测试:

    public class TestConsumer {
        
        public static void main(String[] args) {
    
            final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
            
            // 生产者
            new Thread(new Runnable() {
                
                int appleId = 0;
                Producer producer = new Producer(queue);
                
                @Override
                public void run() {
                    try {
                        while (true) {
                            TimeUnit.SECONDS.sleep(1);
                            producer.put(new Apple(appleId++)); 
                            producer.put(new Apple(appleId++)); 
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            // 消费者
            new Thread(new Runnable() {
                Consumer consumer = new Consumer(queue);
                
                @Override
                public void run() {
                    try {
                        while (true) {
                            System.out.println(consumer.take().getId());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    

    输出:

    生产者生产2个Apple,消费者立即消费掉。

    改进

    上述代码存在一些问题:

    • 生产者和消费者,都仅用于特定的类型Apple
    • 在使用过程中,需要自己定义BlockingQueue,自行实现生产者和消费者的线程,使用复杂
    • 如果要定义多个消费者线程,需要多次手动编写代码
    • 生产者并没有专注自身的功能:存储要消费的对象
    • 消费者并没有专注自身的功能:取出对象、如何消费对象

    改进后的代码如下:

    Apple类未更改。

    Producer变为抽象类,并使用泛型。里面新增线程池,用于运行消费者线程。

    public abstract class Producer<E> {
        protected BlockingQueue<E> queue;
        protected ExecutorService threadPool = Executors.newCachedThreadPool();
        public static final int DEFAULT_QUEUE_LENGTH = 10000;
        
        public Producer(int capacity) {
            initQueue(capacity);
        }
        
        public BlockingQueue<E> getQueue() {
            return queue;
        }
    
        public void setQueue(BlockingQueue<E> queue) {
            this.queue = queue;
        }
    
        public boolean put(E apple) {
            return queue.offer(apple);
        }
        
        private void initQueue(int capacity) {
            if (queue == null) {
                synchronized (this) {
                    if (queue == null) {
                        queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
                    }
                }
            }
        }
        
        protected void consumerThread(int consumerCount, Consumer<E> consumer) {
            for (int i = 0; i < consumerCount; i++) {
                threadPool.execute(consumer);
            }
        }
    }
    

    Consumer也变成抽象类,使用泛型,并实现了Runnable接口。其中run方法的实现逻辑是:从阻塞队列中取出一个对象,并调用抽象方法consume。该方法是具体的消费者实现的消费逻辑。

    public abstract class Consumer<E> implements Runnable{
        BlockingQueue<E> queue;
        
        /**
         * 数据逐个处理
         * @param data
         */
        protected abstract void consume(E data);
        
        @Override
        public void run() {
            while (true) {
                try {
                    E data = take();
                    try {
                        consume(data);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public Consumer(BlockingQueue<E> queue) {
            this.queue = queue;
        }
        
        public E take() throws InterruptedException {
            return queue.take();
        }
    }
    

    AppleProducer:Apple的生产者,使用非延迟加载的单例模式,指定阻塞队列的长度、消费者线程数量。

    public class AppleProducer extends Producer<Apple>{
        
        // 并没有延迟加载
        public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1); 
    
        private AppleProducer(int capacity, int consumerCount) {
            super(capacity);
            AppleConsumer consumer = new AppleConsumer(queue);
            consumerThread(consumerCount, consumer);
        }
    }
    

    AppleConsumer:Apple的消费者,要实现具体的消费方法consume。这里只是在控制台输出对象信息。

    public class AppleConsumer extends Consumer<Apple>{
    
        public AppleConsumer(BlockingQueue<Apple> queue) {
            super(queue);
        }
    
        @Override
        protected void consume(Apple data) {
            System.out.println(data);
        }
    }
    

    测试:这里只需要获取AppleProducer,调用put方法添加对象即可!在队列中有对象Apple时,会有线程取出Apple,自动调用AppleConsumer的consume方法。

    public class TestConsumer {
        
        public static void main(String[] args) throws InterruptedException {
    
            AppleProducer producer = AppleProducer.INSTANCE;
            for (int i = 0; i < 60; i++) {
                producer.put(new Apple(i));
            }
        }
    }
    

    有待改进的地方

    • 并没有面向接口编程,仍然是通过继承来实现的,代码有耦合(但是也不能算是缺点吧)
    • 阻塞队列直接使用LinkedBlockingDeque,并不够灵活(PriorityBlockingQueue等)
    • 对于线程,并没有好的名字,调试等并不直观
    • 如果有多个生产者-消费者,例如增加了Banana,管理仍然不够直观。可以增加一个方法,能够打印出所有的生产者-消费者

    相关文章

      网友评论

        本文标题:Java 生产者消费者实现 —— BlockingQueue

        本文链接:https://www.haomeiwen.com/subject/rvmhjxtx.html