美文网首页
ArrayBlockingQueue 深入分析

ArrayBlockingQueue 深入分析

作者: 街角下的蚂蚁 | 来源:发表于2020-08-15 13:07 被阅读0次
    • 导语

    ArrayBlockingQueue 是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。

    分析要点

    • 是否线程安全?
    • 数据结构是怎样的?
    • 怎么实现阻塞和非阻塞插入和获取元素?
    • 怎么实现插入和获取元素平衡的?
    • 应用与哪些场景?

    深入剖析

    成员变量

    // 元素存放的对象,可以看出其数据结构仍然为数组
    final Object[] items;
    // 获取下一个元素的数组下标
    int takeIndex;
    // 插入下一个元素的数组下标
    int putIndex;
    // 数组元素个数
    int count;
    // 独立的锁机制,后续详细讲解
    final ReentrantLock lock;
    // 定义非空条件锁
    private final Condition notEmpty;
    // 定义非满条件锁
    private final Condition notFull;
    

    构造函数

    public ArrayBlockingQueue(int capacity, boolean fair) {
     if (capacity <= 0)
      throw new IllegalArgumentException();
      this.items = new Object[capacity];
      lock = new ReentrantLock(fair);
      notEmpty = lock.newCondition();
      notFull =  lock.newCondition();
    }
    
    • 构造函数中初始化数组大小、锁、条件锁

    元素的插入

    • 队列的插入方式分为三种,非阻塞插入、超时阻塞插入、阻塞插入
    1. offer(),一旦队列容量已满,直接返回插入失败
    public boolean offer(E e) {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      // 同步锁,元素插入是线程安全的
      lock.lock();
      try {
        // 一旦队列容量已满,则直接返回插入失败
        if (count == items.length)
          return false;
        else {
          // 插入元素的方法
          enqueue(e);
          return true;
        }
      } finally {
        lock.unlock();
      }
    }
    
    private void enqueue(E x) {
      final Object[] items = this.items;
      // 将元素x插入到数组putIndex位置
      items[putIndex] = x;
      // 如果putIndex等于了数组的大小,则将putIndex=0,从头开始插入,从这里可以看出ArrayBlockingQueue的实现是基于一个数组,从头到位不断的插入和获取元素
      if (++putIndex == items.length)
        putIndex = 0;
      // 元素个数+1  
      count++;
      // 发送队列不为空信号,可以唤醒调用了notEmpty.await()方法的线程,也是现实插入、获取阻塞方法的原理
      notEmpty.signal();
    }
    
    1. put(),一旦队列容量已满,则进入等待,直到队列可以插入元素为止
    public void put(E e) throws InterruptedException {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
        // 如果当前数组容量已满,则线程进入阻塞等待,直到其他线程调用了notFull.signal()唤醒该线程
        while (count == items.length)
          notFull.await();
        // 线程被唤醒后,说明数组已经可以进行插入了  
        enqueue(e);
      } finally {
        lock.unlock();
      }
    }
    
    1. offer(E e, long timeout, TimeUnit unit),一旦队列容量已满,则进入指定的等待时长
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
      checkNotNull(e);
      // 将等待时间转化为纳秒
      long nanos = unit.toNanos(timeout);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
        while (count == items.length) {
          // 如果nanos返回值小于0,则代表等待超时,直接返回插入失败
          if (nanos <= 0)
            return false;
          // 阻塞相应的时间nanos
          nanos = notFull.awaitNanos(nanos);
        }
        // 进行元素插入
        enqueue(e);
        return true;
      } finally {
        lock.unlock();
      }
    }
    

    元素的获取

    • 队列获取元素,同样有三种方式,非阻塞获取、超时阻塞获取、阻塞获取
    1. poll(),当队列为空时,直接返回null
    public E poll() {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
        // count == 0代表队列为空,直接返回null
        return (count == 0) ? null : dequeue();
      } finally {
        lock.unlock();
      }
    }
    
    private E dequeue() { 
      final Object[] items = this.items;
      // 获取takeIndex位置的元素
      E x = (E) items[takeIndex];
      items[takeIndex] = null;
      // 如果下一个获取元素位置大于了数组大小,则从头开始获取
      if (++takeIndex == items.length)
        takeIndex = 0;
      // 元素总大小-1  
      count--;
      if (itrs != null)
        itrs.elementDequeued();
      notFull.signal();
      return x;
    }
    
    1. take(),一旦队列为空,则阻塞,直到从队列中获取到元素为止
    public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
        while (count == 0)
          // 队列为空时,则阻塞,知道其他线程调用notEmpty.signal()唤醒该线程,然后获取元素返回
          notEmpty.await();
        return dequeue();
      } finally {
        lock.unlock();
      }
    }
    
    1. poll(long timeout, TimeUnit unit),一旦队列为空,则阻塞一段时间。同插入原理一样
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
      long nanos = unit.toNanos(timeout);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
        while (count == 0) {
          if (nanos <= 0)
            return null;
          nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
      } finally {
        lock.unlock();
      }
    }
    

    是否线程安全

    ArrayBlockingQueue在插入和获取元素的时候,都进行了锁,所以它是线程安全的

    应用场景

    • 由于基于数组,容量固定所以不容易出现内存占用率过高,但是如果容量太小,取数据比存数据的速度慢,那么会造成过多的线程进入阻塞(也可以使用offer()方法达到不阻塞线程), 此外由于存取共用一把锁,所以有高并发和吞吐量的要求情况下,我们也不建议使用ArrayBlockingQueue。

    更多细节,请阅读ArrayBlockingQueue源码。

    扫码关注了解更多

    相关文章

      网友评论

          本文标题:ArrayBlockingQueue 深入分析

          本文链接:https://www.haomeiwen.com/subject/bjhadktx.html