美文网首页JDK源代码
ArrayBlockingQueue源代碼解析(base jdk

ArrayBlockingQueue源代碼解析(base jdk

作者: 冰殇之刃 | 来源:发表于2018-03-19 20:28 被阅读0次

前记:

上篇文章写完之后,觉得代码贴的太多了,不过源代码

解析这种的,就是看源代码才有意思。主要是还是引导多思考,以
后写读源代码文章的博客的步骤,1.通过这段代码实现的功能
会先构思一下实现方案和代码架构,和一些其他的思考。2逐行
分析每个代码行的意思和意图。3.总结,对比其他类似的实现
或者其他的一些感悟。

正文:

ArrayBlockingQueue 是数组结构的堵塞队列的一种实现,那么肯定要实现的BlockingQueue接口。

解释一下接口含义
boolean add(E e); 队列添加元素,返回成功标识,队列满了抛出队列满的异常,无堵塞。
boolean offer(E e);队列添加元素,返回成功标识,无堵塞。
void put(E e);队列添加元素,无返回值,队列满了会堵塞。
boolean offer(E e, long timeout, TimeUnit unit);队列添加元素,队列满了堵塞,设置有超时时间。
E poll();队列拉取元素,无堵塞,没有值返回null。
E take();队列拉取元素,队列空了会堵塞,等待能拉取到值为止
E poll(long timeout, TimeUnit unit);队列拉取元素,队列空了等待,设置有等待超时时间
E peek() ; 只读队首元素的值,没有返回空
int remainingCapacity(); 计算剩余容量
boolean remove(Object o); 移除元素
int drainTo(Collection<? super E> c, int maxElements); 移除元素放到入参的集合当中
public Iterator<E> iterator() jdk 1.8以后ArrayBlockingQueue还增加了迭代器功能,这个模块下面会重点介绍,很有意思。
堵塞队列提供的功能:
在多线程环境下提供一个类似于生产者和消费者这样的一个模型
提供一个FIFO的顺序读取和插入
那就引起我的思考:

怎么实现的堵塞机制和堵塞的超时机制?
作为一个集合类,数组结构的怎么在多线程环境下实现安全扩容?
1.8jdk版本为什么会增加迭代器功能?
1.元素

/** 堵塞队列中存放的对象 /
final Object[] items;
/
* 消费者获取对象的下一个对象下标,具体的操作有poll take peek remove /
int takeIndex;
/
* 生产者放入对象的下一个对象的下标,具体的操作有 put offer add /
int putIndex;
/
* 队列中元素的数量 /
int count;
/
* 这个锁就是实现生产者,消费者模型的锁模型,并且所有和并发相关的堵塞控制都是通过这个锁来实现的/
final ReentrantLock lock;
/
* 这个是有ReentrantLock 中的Condition一个标识队列中有元素非空标志,用于通知消费者队列中有数据了,快来取数据 /
private final Condition notEmpty;
/
* 这个也是ReentrantLock 中的Condition的一个标识,标识队列中的元素不满用于通知生产者队列中空地,快来塞数据/
private final Condition notFull;
/
*

  • 这是一个迭代器集合,是之前没有的特性,
  • 细节:transient 标示变量是序列化忽略这个变量。那么为啥要这么做呢?迭代器都是new 出来的,即使保存再集合里面,别人也拿不到这个的引用。用不了,纯浪费空间。
    /
    transient Itrs itrs = null;
    /
    这个函数的意思是传入i返回i-1,因为是数组,空间已经固定了。
  • 可以将线性结构理解成环形结构,最前面的那个数再减就到了最后面了 特别有意思的一点是1.6之前有inc这个函数和这个功能刚 好相反,不是特别明白为啥干掉了。也许是觉得不安全,破坏结构?
    /
    final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
    }
    /
    *
  • 迭代器的集合,链表形式

*/

class Itrs {

/**
 * 将里面的元素设置成弱引用,目标就是当成缓存使用的
 * Node里面存放的其实迭代器
 */
private class Node extends WeakReference<Itr> {
    Node next;
     Node(Itr iterator, Node next) {
        super(iterator);
        this.next = next;
    }
}
/** 记录循环的次数,当take下标到0的时候为一个循环 cycle+1 */
int cycles = 0;
/** Node的前节点 */
private Node head;
/** 用于删除无用的迭代器 */
private Node sweeper = null;
/***
 * 这个标识删除探针
 */
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
/**初始化函数注册迭代器到迭代器集合里面

Itrs(Itr initial) {
    register(initial);
}
/**
 * 清理itrs 整理旧的过期的迭代器 所谓过期的迭代器,是被标识为none 或者是Detached就是被取走的
  • 这个整理动作也是很有意思,普通是循环SHORT_SWEEP_PROBES次数,一旦发现有,那就会多循 环LONG_SWEEP_PROBES次数,尽力去寻找
    /
    void doSomeSweeping(boolean tryHarder) {
    int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
    Node o, p;
    final Node sweeper = this.sweeper;
    boolean passedGo; // to limit search to one full sweep
    if (sweeper == null) {
    o = null;
    p = head;
    passedGo = true;
    } else {
    o = sweeper;
    p = o.next;
    passedGo = false;
    }
    for (; probes > 0; probes--) {
    if (p == null) {
    if (passedGo)
    break;
    o = null;
    p = head;
    passedGo = true;
    }
    final Itr it = p.get();
    final Node next = p.next;
    //这个条件就是发现需要被清理的迭代器
    if (it == null || it.isDetached()) {
    //这个就是更努力的去清理
    probes = LONG_SWEEP_PROBES;
    //下面是清理动作,然后指针后移
    p.clear();
    p.next = null;
    if (o == null) {
    head = next;
    if (next == null) {
    //这就是迭代器已经遍历完了,然后函数返回了
    itrs = null;
    return;
    } }
    else o.next = next;
    } else o = p;
    p = next;
    }
    //sweeper 是开始清理的节点位置
    this.sweeper = (p == null) ? null : o;
    }
    /
    *
    • 注册逻辑的实现,在链表的最前面加元素
      */
      void register(Itr itr) {
      head = new Node(itr, head);
      }

/***

  • 迭代器

/
private class Itr implements Iterator<E> {
/
* 光标,是迭代器下一次迭代时的坐标,迭代器没有需要遍历的对象了,这个值会为负值/
private int cursor;
/
* 下一个元素内容,调用Iterator.next方法拿到的值 /
private E nextItem;
/
* 下一个元素的下标,none 是-1 被移除了是-2对应下面的static int /
private int nextIndex;
/
* 上一个元素的内容 /
private E lastItem;
/
* 上一个元素的的下标,none 是-1 被移除的是-2 同样对应下面的static int /
private int lastRet;
/
* 记录之前的开始遍历的下标,当这个迭代器判定为失效了这个值就是DETACHED /
private int prevTakeIndex;
/
记录之前循环次数的值,和Cycles进行比对,就知道有没有再循环过 /
private int prevCycles;
private static final int NONE = -1;
/
元素被调用remove方法移走,的状态/
private static final int REMOVED = -2;
/** 分离分开的Special value for prevTakeIndex indicating "detached mode" /
private static final int DETACHED = -3;
/
迭代器的初始化函数从takeIndex位置开始遍历/
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//队列里面没有值
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
//队列首元素后一个
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
//注册到itrs,所有迭代器的集合,顺序注册的
itrs.register(this);
// 清理无用的迭代器
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}

方法:(只介绍复杂性,有代表性的)

/**

堵塞提交,超时返回false
/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//获取锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//这里是使用同步队列器的超时机制,在nanos的时间范围内,方法会在这里堵塞,超过这个时间段nanos的值会被赋值为负数,方法继续,然后在下一个循环返回false。这个标志是未满标志,队列里面未满就可以放进元素嘛。然后判断成功就是一个入队列操作
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
/

  • 入队列操作,因为putIndex已经是当前该放入元素的下标了,放入元素之后,
  • 需要将putIndex+1,并且元素数量加1。然后直接调用非空标志通知等待中的消费者
  • 质疑:如果我没有等待中的消费者,那也要通知,那不是浪费么?
  • 解释:下端代码是signal的实现
    public final void signal() {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first)
    }
    signal方法已经在里面已经对队列的首元素判断空,不通知了,
    这个引起我的一个思考,确实在函数里面就应该对这些条件做判断要比外面判断更好一些,一个是更健壮,一个是更友好,但是这个最小作用模块还是功能模块,别一个调用链做了多次的这种条件的判断,这就让阅读者难受了。
    /
    private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
    putIndex = 0;
    count++;
    notEmpty.signal();
    }
    /
    **
  • poll的操作和offer基本一样,就是做的是出队列的操作。还有就是一个drainTo方法也很类似,有一个细节有意思就是drainTo是
    一个批量操作,但是通知却是一个一个通知的。没有调用singalAll()。因为堵塞队列强调一个顺序。一进一出原则。还有就是在外面判断了有无等待者。因为这*样却是省不必要的循环了。
    */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while (count == 0) {
    if (nanos <= 0)
    return null;
    nanos = notEmpty.awaitNanos(nanos);
    }
    return dequeue();
    } finally {
    lock.unlock();
    }
    }

/**

  • 出队列操作,跟入队列操作正好是相反的,多了一个清理操作

/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//@key jdk1.8的新特性迭代器特性,这里是因为元素的出队列所以清理和这个元素相关联的迭代器
itrs.elementDequeued();
//对于生产者的通知
notFull.signal();
return x;
}
/
*

  • 根据下标移除元素,那么会分成两种情况一个是移除的是队首元素,一个是移除的是非队首元素,移除队首元素,就相当于出队列操作,移除非队首元素那么中间就有空位了,后面元素需要依次补上,然后如果是队尾元素,那么putIndex也就是插入操作的下标也就需要跟着移动。这里面同样有无用迭代器的清理和notFull标志的通知。elementDequeued 和removedAt 这两个函数差不多主要做的就是清理。但是不一样的是第一种情况当成出队列来处理了。而第二种就相当于这个元素就没有进过队列来处理,轻轻地来,轻轻地走不带走一片云彩
    /
    void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    //当移除的元素正好是队列首元素,就是take元素,正常的类似出队列的操作,
    if (removeIndex == takeIndex) {
    // removing front item; just advance
    items[takeIndex] = null;
    if (++takeIndex == items.length)
    takeIndex = 0;
    count--;
    if (itrs != null)
    itrs.elementDequeued();
    //
    } else {
    //因为是队列中间的值被移除了,所有后面的元素都要挨个迁移
    final int putIndex = this.putIndex;
    for (int i = removeIndex;;) {
    int next = i + 1;
    if (next == items.length)
    next = 0;
    if (next != putIndex) {
    items[i] = items[next];
    i = next;
    } else {
    items[i] = null;
    this.putIndex = I;
    break;
    }
    }
    count—;
    if (itrs != null)
    itrs.removedAt(removeIndex);
    }
    notFull.signal();
    }
    /
    *
    • 当元素出队列的时候调用的方法这个出队列方法
      /
      void elementDequeued() {
      // 在队列为空的时候调用清空所有的迭代器;
      if (count == 0)
      queueIsEmpty();
      // 当拿元素进行循环的时候,清理所有过期的迭代器
      else if (takeIndex == 0)
      takeIndexWrapped();
      }
      }
      /
      *
  • 因为takeIndex等于0了,意味着开始下一个循环了.
  • 然后通知所有的迭代器,删除无用的迭代器。
    /
    void takeIndexWrapped() {
    //循环了一次cycle加1
    cycles++;
    for (Node o = null, p = head; p != null;) {
    final Itr it = p.get();
    final Node next = p.next;
    //需要清理的条件,和清理代码
    if (it == null || it.takeIndexWrapped()) {
    p.clear();
    p.next = null;
    if (o == null)
    head = next;
    else
    o.next = next;
    } else {
    o = p;
    }
    p = next;
    }
    //没有迭代器了,就关掉迭代器的集合
    if (head == null) // no more iterators to track
    itrs = null;
    }
    /
    *这个takeIndexWrapped 是内部类Itr 的方法跟上面不是一个类的方法
    *这里就是判断这个迭代器所持有的元素还在队列里面么,那么有两个条件,1.isDetached()
  • 2.就是看这个的循环次数,比建立这个迭代器的时候的循环次数,如果大于1,说明发生过两次以上的循环
  • 拿里面的元素都换了个遍,拿肯定是不对了,拿这个迭代器就被关闭了。
  • @return true if this iterator should be unlinked from itrs
    /
    boolean takeIndexWrapped() {
    // assert lock.getHoldCount() == 1;
    if (isDetached())
    return true;
    if (itrs.cycles - prevCycles > 1) {
    // All the elements that existed at the time of the last
    // operation are gone, so abandon further iteration.
    shutdown();
    return true;
    }
    return false;
    }
    //将所有的标志位都标记成remove ,null
    void shutdown() {
    cursor = NONE;
    if (nextIndex >= 0)
    nextIndex = REMOVED;
    if (lastRet >= 0) {
    lastRet = REMOVED;
    lastItem = null;
    }
    prevTakeIndex = DETACHED;
    }
    /
    **
  • 迭代器的基本方法之一,获取下一个元素,会发生缓存器失效的情况,如果是缓存器失效了,能重组就重组,即从takeIndex开始遍历,如果不行就标记失效, *返回none
  • @return
    */
    public E next() {
    // assert lock.getHoldCount() == 0;
    final E x = nextItem;
    if (x == null)
    throw new NoSuchElementException();
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
    //当判定该迭代器失效了,会重组迭代器,以takeIndex为起点开始遍历,或者标记失效
    if (!isDetached())
    incorporateDequeues();
    lastRet = nextIndex;
    final int cursor = this.cursor;
    //cursor这个值会在incorporateDequeues方法中修改,
    if (cursor >= 0) {
    nextItem = itemAt(nextIndex = cursor);
    this.cursor = incCursor(cursor);
    } else {
    nextIndex = NONE;
    nextItem = null;
    }
    } finally {
    lock.unlock();
    }
    return x;
    }

/**

  • 发现元素发生移动,通过判定cycle等信息,然后cursor取值游标就重新从takeIndex开始
  • 下面如果发现所有记录标志的值发生变化,就直接清理本迭代器了。
  • */
    private void incorporateDequeues() {
    final int cycles = itrs.cycles;
    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    final int prevCycles = this.prevCycles;
    final int prevTakeIndex = this.prevTakeIndex;
    if (cycles != prevCycles || takeIndex != prevTakeIndex) {
    final int len = items.length;
    // 从本迭代器建立开始,到目前堵塞队列出队列的个数,也就是takeIndex的偏移量
    long dequeues = (cycles - prevCycles) * len
    + (takeIndex - prevTakeIndex);
    // 判断所记录的last,next cursor 还是不是原值如果不是,这个迭代器就判定detach
    if (invalidated(lastRet, prevTakeIndex, dequeues, len))
    lastRet = REMOVED;
    if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
    nextIndex = REMOVED;
    if (invalidated(cursor, prevTakeIndex, dequeues, len))
    cursor = takeIndex;
    if (cursor < 0 && nextIndex < 0 && lastRet < 0)
    detach();
    else {
    //重新记录cycle值
    this.prevCycles = cycles;
    this.prevTakeIndex = takeIndex;
    }
    }
    }

回顾一下;

我介绍了ArrayblockingQueue其实是包含了两个部分一个是标准阻塞队列接口的实现。另一个是jdk1.8增加的迭代器。上一个满大街博客都能找的到,我就把接口描述了一下,然后介绍了两个还算是复杂一点的接口。和整个一个工作原理,没有太多使用case。主要是就是生产者和消费者模型。一个锁应用,和其他的JUC框架不一样。它什么操作都加锁,并发变串行。所以它就没有用到原子类修饰的共享变量。
关于迭代器部分好像是只有我这里有写。如果有百度上有看到相关ArrayBlockingQueue迭代器文章的请留言。毕竟我一家之言,还是有可能会有理解上的偏差。我们总结一下这个迭代器。首先跟别的设计一样,谁用谁new。这个不一样的是会增加一个注册到堵塞队列对象里面itrs上面。然后呢用了一个软引用,那么就GC可以回收避免内存溢出。然后会有对无用的迭代器的清理,类似于threadLocal那样。那么什么是无用的迭代器呢。标识无用就一个条件,我的迭代器标识的结点被覆盖了,因为它空间就这么大,举个例子一个大小5的堵塞队列。然后我建了一个迭代器,那么这个迭代器的下标就是0.然后迭代器我没有马上用,然后进出队列10次,那么之前节点的值已经被替换了。队列里面还有值,但是迭代器的值已经在take方法中被干掉了,已经失效了。判断条件就是cycle的循环次数。有兴趣可以好好了解一下,这应该是我看过的最复杂的迭代器了。
留一些问题:

1.这个迭代器为什么会比arrayList复杂这么多?

2.其实作为堵塞队列来说无非就是数据交换,拿有什么场景是需要迭代器的?而且本身就全都锁控制,效率就不高。还加入这么复杂的迭代模块。会更慢一些的?

这篇文章会看起来比较碎。尽力了。。没有整块的时间去写。而且没想这个迭代器这么复杂。花费我很多时间去研究(没错,这就是我脱稿的原因)

还有就是风格和上一篇不一样了。我希望可以让看这篇文章的人不光是可以学习到之前不知道的知识。也可以触发大家更多的去主动的思考,去思考模块的设计,功能的实现。而不是被动接受这篇文章所传递出来的内容。

还有就是看这种源码。一定要先框架,功能。摸透再去看细节。如果你对这个代码块所要完成的功能不够了解。拿看起来费劲。框架,功能这些都摸透了。再钻到细节上面去。我们可能用到的框架很多,拿要读的源代码那就太多了。其实阅读源代码我觉得是培养一个阅读代码的能力。一个是学习处理这种场景的解决方案,一个是学习编程风格,编码模式。还有就是可能会培养对编程、对探究的兴趣。毕竟工作不能只是为了赚钱。

相关文章

网友评论

    本文标题:ArrayBlockingQueue源代碼解析(base jdk

    本文链接:https://www.haomeiwen.com/subject/oiyrqftx.html