美文网首页Java 杂谈java多线程
Java 多线程(九):ArrayBlockingQueue 与

Java 多线程(九):ArrayBlockingQueue 与

作者: 聪明的奇瑞 | 来源:发表于2018-07-11 00:40 被阅读16次

    什么是阻塞队列?

    • 阻塞队列与我们平常接触到的普通队列(ArrayList)的最大不同点在于阻塞队列的添加和删除方法都是阻塞的
      • 阻塞添加:当阻塞队列元素已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行元素加入操作
      • 阻塞删除:当队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作

    BlockingQueue

    • BlockingQueue 能够解决多线程中如何高效安全的传输数据的问题,帮助我们快速搭建高质量的多线程程序
    • 通过队列,可以使得数据由队列的一端输入,从另一端输出
    • 适用场景:生产者线程在一端生成,消费者线程在另一端消费
    BlockingQueue

    BlockingQueue 主要方法

    • 插入方法:
      • add(E e):
        • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
        • 成功返回 true,失败抛 IllegalStateException 异常
      • offer(E e,long timeout,TimeUnit unit):
        • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
        • 可设置超时时间,该方法可以中断
        • 成功返回 true,如果队列已满,返回 false
      • put(E e):
        • 将元素插入到队尾,如果队列已满则一直等待(阻塞)
    • 删除方法:
      • remove(Object o):
        • 移除指定元素,成功返回 true,失败返回 false
      • poll(long timeout, TimeUnit unit):
        • 获取并移除此队列的头元素,在指定等待的时间前一直等到获取元素
      • take():
        • 获取并移除队列头元素,若没有元素则一直阻塞
    • 检查方法:
      • element():
        • 获取但不移除队列的头元素,没有元素则抛异常
      • peek():
        • 获取但不移除队列的头,若队列为空则返回 null

    BlockingQueue 基本使用

    • 该例子中使用 ArrayBlockingQueue,生产者(Producer)将字符串插入共享队列中,消费者将它们取出
    public class BlockingQueueExample {
        public static void main(String[] args) throws Exception {
            BlockingQueue queue = new ArrayBlockingQueue(3);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer).start();
            new Thread(consumer).start();
            Thread.sleep(4000);
        }
    }
    
    • 生产者通过 put() 方法将元素插入共享队列中
    public class Producer implements Runnable {
    
        private BlockingQueue queue;
    
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    • 消费者通过 take() 方法从队列中取出元素,take() 方法会阻塞直到获取元素为止
    public class Consumer implements Runnable {
    
        private BlockingQueue queue = null;
    
        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    BlockingQueue 接口实现类与源码分析

    ArrayBlockingQueue

    • 分析:
      • 基于数组的阻塞队列实现,内部维护了一个数组用于缓存队列中的数据对象
      • 有两个 Integer 类型的索引,指向添加、获取下一个元素的位置的索引
      • 并通过一个重入锁 ReentrantLock 和两个 Condition 条件来实现阻塞
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /** 存储数据的数组 */
        final Object[] items;
    
        /**获取数据的索引,主要用于take,poll,peek,remove方法 */
        int takeIndex;
    
        /**添加数据的索引,主要用于 put, offer, or add 方法*/
        int putIndex;
    
        /** 队列元素的个数 */
        int count;
    
    
        /** 控制并非访问的锁 */
        final ReentrantLock lock;
    
        /**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
        private final Condition notEmpty;
    
        /**notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
        private final Condition notFull;
    
        /**
           迭代器
         */
        transient Itrs itrs = null;
    
    }
    

    image
    • 分析:
      • add 方法实际上是调用了 offer 方法
      • enqueue(E x) 方法内部通过 putIndex 索引直接将元素添加到数组 item 中,当 putIndex 索引大小等于数组长度时,需要将 putIndex 重新设置为 0,这是因为当前队列元素总是从队头获取,从队尾添加
    //add方法实现,间接调用了offer(e)
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
    //offer方法
    public boolean offer(E e) {
         checkNotNull(e);//检查元素是否为null
         final ReentrantLock lock = this.lock;
         lock.lock();//加锁
         try {
             if (count == items.length)//判断队列是否满
                 return false;
             else {
                 enqueue(e);//添加元素到队列
                 return true;
             }
         } finally {
             lock.unlock();
         }
     }
    
    //入队操作
    private void enqueue(E x) {
        //获取当前数组
        final Object[] items = this.items;
        //通过putIndex索引对数组进行赋值
        items[putIndex] = x;
        //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;//队列中元素数量加1
        //唤醒调用take()方法的线程,执行元素获取操作。
        notEmpty.signal();
    }
    

    SJziX.md.png
    • 分析:
      • put 方法是一个阻塞方法,如果队列元素已满,那么当前线程将会被 notFull 条件对象挂起加入到等待队列中,直到有空挡才会唤醒执行添加操作
    //put方法,阻塞时可中断
     public void put(E e) throws InterruptedException {
         checkNotNull(e);
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();//该方法可中断
          try {
              //当队列元素个数与数组长度相等时,无法添加元素
              while (count == items.length)
                  //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
                  notFull.await();
              enqueue(e);//如果队列没有满直接添加。。
          } finally {
              lock.unlock();
          }
      }
    

    LinkedBlockingQueue

    • 基于链表的阻塞队列,内部维护着一个链表构成的缓冲队列,用于缓存队列中的数据对象
    • 在正常情况下链表阻塞队列的吞吐量要高于数组的阻塞队列(ArrayBlockingQueue),因为其内部实现添加和删除操作使用了两个 ReentrantLock 来控制并发执行(插入、获取各有一个锁),而 ArrayBlockingQueue 内部只使用一个 ReentrantLock 控制并发
    • 它与 ArrayBlockingQueue 的 API 几乎一致但内部实现原理不太相同
    • 当创建一个 LinkedBlockingQueue 时,默认阻塞队列中元素的数量大小为 Interger.MAX_VALUE

    LinkedBlockingQueue 和 ArrayBlockingQueue 的区别

    • 队列大小有所不同:ArrayBlockingQueue 必须指定队列大小,而 LinkedBlockingQueue 默认为 Integer.MAX_VALUE(当元素添加速度大于移除速度时,需要注意一下,以免内存溢出)
    • 实现结构不同:ArrayBlockingQueue 采用数组实现、而 LinkedBlockingQueue 采用链表实现
    • 由于 ArrayBlockingQueue 采用数组存储队列元素,因此再插入、删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 每次插入都会生成一个新的结点(Node)对象,这会影响日后 GC 垃圾回收
    • ArrayBlockingQueue 中添加、删除操作只使用一个锁(ReentrantLock),而 LinkedBlockingQueue 添加、删除操作各使用一个锁,因此 LinkedBlockingQueue 的并发吞吐量大于 ArrayBlockingQueue

    相关文章

      网友评论

        本文标题:Java 多线程(九):ArrayBlockingQueue 与

        本文链接:https://www.haomeiwen.com/subject/nmdcpftx.html