美文网首页
阻塞队列源码解析

阻塞队列源码解析

作者: a乐乐_1234 | 来源:发表于2017-07-16 22:24 被阅读0次

阻塞队列接口

BlockingQueue<E>的方法

既然是队列,说明遵循先进先出的规则(FIFO),肯定有入队和出队的方法,看了一下注释,有几种不同的出队入队方法,下面列举了一下:

| | 抛异常 | 返回值(null/true/false) | 阻塞 | 指定超时时间 |
|:------:|:-------- :|:---------:|:--------:|:-----------------------:|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 删除 | remove() | poll() | take() | poll(time,unit) |
| 检查 | element() | peek() | | |

BlockingQueue的常见实现类
  • ArrayBlockingQueue<E>
    底层使用一个Object数组保存元素,自然能想到在初始化的时候必须指定最大容量。
//fair表示是否需要公平性,默认关闭,建议使用默认值
 public ArrayBlockingQueue(int capacity, boolean fair) {
      if (capacity <= 0)
           throw new IllegalArgumentException();
       // 初始化一个数组
       this.items = new Object[capacity];
       //创建ReentrantLock实例
       lock = new ReentrantLock(fair);
       //创建一个非空的条件对象
       notEmpty = lock.newCondition();
       //创建一个非满的条件对象
       notFull =  lock.newCondition();
   }
//入列
 private void enqueue(E x) {
      final Object[] items = this.items;
      items[putIndex] = x;//putIndex表示当前入队的元素索引
      if (++putIndex == items.length)
        putIndex = 0;
     count++;
     notEmpty.signal();//通知因notEmpty条件不满足阻塞的线程解除阻塞
  }
  //出列
  private E dequeue() {
      final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      E x = (E) items[takeIndex];//takeIndex表示当前出列索引
      items[takeIndex] = null;
      if (++takeIndex == items.length)
          takeIndex = 0;
      count--;
      if (itrs != null)
          itrs.elementDequeued();
      notFull.signal();//通知因notFull条件不满足阻塞的线程解除阻塞
      return x;
    }
  //添加元素
  public void put(E e) throws InterruptedException {
     checkNotNull(e);//检查元素非空
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();//获取锁
     try {
         while (count == items.length)  //队列满了
             notFull.await();//当前线程进入条件等待集,直到其他线程调用notFull.signal方法解除阻塞
         enqueue(e);  //入列
     } finally {
         lock.unlock();  //解锁
     }
 }
//删除元素
public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         while (count == 0)  //队列为空
            notEmpty.await();  //当前线程进入条件等待集,直到其他线程调用notEmpty.signal方法解除阻塞
         return dequeue();  //出列
     } finally {
         lock.unlock();
    }
 }
  • 因为ArrayBlockingQueue是基于数组实现,初始化时会指定数组大小,传入参数不能是null,否则报NullPointerException。

  • 通过分析源码,ArrayBlockingQueue的实现核心是使用ReentrantLock(可重入互斥锁),为什么叫可重入呢?是因为线程是可以重复获得已经持有的锁。锁持有一个计数(hold count)来跟踪lock方法的嵌套调用。线程每调用一次lock方法后都要调用unlock来释放锁

  • 指定超时时间的两个方法offer和poll的实现原理是分别调用对应条件的awaitNanos(nanos),这个方法是指定等待时间,超过这个时间当前线程会自动唤醒,这这段时间内,其他线程调用了同种条件的signal或signalAll也会唤醒当前线程

  • LinkedBlockingQueue<E>

  • 基于单向链表(item,next)实现,默认容量最大为Integer.MAX_VALUE,也可以初始化是指定容量大小。
  • 与此类似的还有一个LinkedBlockingDeque,它里面维护的是一个双端链表(item,prev,next),它可实现先进后出(FILO)

相关文章

网友评论

      本文标题:阻塞队列源码解析

      本文链接:https://www.haomeiwen.com/subject/ecztkxtx.html