今天看一看BlockingQueue阻塞队列,怎么来实现阻塞的;
一、阻塞队列体系图
image.png二、主要属性
//元素数组
final Object[] items;
// 取元素的指针
int takeIndex;
// 放元素的指针
int putIndex;
// 元素数量
int count;
//集合锁
final ReentrantLock lock;
//非空条件等待队列
private final Condition notEmpty;
//未满条件等待队列
private final Condition notFull;
从上面主要属性我们大概可以看出,入队元素是放入数组中的,然后由取元素的指针指向队列头元素,放元素的指针指向队列尾元素;由锁来保证取放数据时的并发安全
三、源码分析
我们来跟着源码分析一遍,主要是看注释
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化数组,容量为capacity
this.items = new Object[capacity];
//初始化锁,true为公平锁,false为非公平锁
lock = new ReentrantLock(fair);
//初始化两个条件队列
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
构造方法先初始化了一些主要元素,我们再看看put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//入队时,先加锁,如果被打断会抛异常出来
lock.lockInterruptibly();
try {
/*
*当队列已经放满时,则把当前线程放入,未满条件等待队列,挂起线程
*当线程被唤醒时,则再去看队列是否放满
*未放满,则元素入队,放满则再继续挂起
*/
while (count == items.length)
notFull.await();
//当队列未满时,则把元素入队,
enqueue(e);
} finally {
//添加完成释放锁
lock.unlock();
}
}
我们再看看take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//出队时,先加锁
lock.lockInterruptibly();
try {
/*
*当队列为空时,则把该线程放入非空条件等待队列,挂起该线程
*当线程被唤醒时,则再去看队列是否非空
*有值,则出队,没有值,则再次挂起该线程
*/
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();
}
}
到此,则主要逻辑分析完了,然后我们继续来看看,条件队列是怎么来挂起该线程的,本来应该是在上一节ReentrantLock源码分析中去讲该逻辑的,但是上次主要是讲ReentrantLock主流程,这次遇到了条件锁,那就正好来分析一遍;我们先来看看lock.newCondition() new出来的对象ConditionObject主要元素;
//条件队列的头节点
private transient Node firstWaiter;
//条件队列的尾节点
private transient Node lastWaiter;
再来看看调用的await()方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建该条件队列的节点
Node node = addConditionWaiter();
//该方法很明显就是ReentrantLock释放锁的逻辑,这里就不分析了
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断该节点是否在sync队列中,不在则挂起该线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//唤醒后,会去修改waitStatus属性,然后把条件队列的节点,移入sync队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/*当该节点被唤醒时,这个可以看下后面signal方法后再回到这里
*因为唤醒后,该节点会被移入sync队列,所以才会跳出上面的循环
*acquireQueued这个方法正好也是ReentrantLock抢占锁资源的方法,这里就不分析了
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//断开所有条件队列的指针,清空条件队列
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果最后一个节点元素状态不是CONDITION,则说明该元素已经被移入sync队列,则清空该条件队列
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//新建一个CONDITION状态的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//队列为空时,条件队列的头结点,指向该节点
firstWaiter = node;
else
//如果条件队列不为空,则把尾结点的下个元素指向该节点
t.nextWaiter = node;
//把尾节点指向该节点
lastWaiter = node;
return node;
}
final boolean isOnSyncQueue(Node node) {
//当前节点状态为CONDITION,或者上个节点为空,则说明不在sync队列上
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果节点sync队列的下个节点不为空,则说明,该节点在sync队列上
if (node.next != null) // If has successor, it must be on queue
return true;
//遍历sync队列,看是否能在sync队列上找到该节点,找到则返回true,否则返回false
return findNodeFromTail(node);
}
private int checkInterruptWhileWaiting(Node node) {
//线程没有被打断时,调用transferAfterCancelledWait,修改状态,并且把节点都移入到sync队列中
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//CAS去修改节点状态,从CONDITION,修改成初始状态
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//从ReentrantLock中,我们知道这个是入队(sync队列)的操作
enq(node);
return true;
}
//该循环是为了防止同时有signal方法执行,上面抢占锁失败,必须等待该节点已经被移入到sync队列后才能跳出该方法
while (!isOnSyncQueue(node))
//线程让出CPU,等待下次执行
Thread.yield();
return false;
}
//从该方法看出,就是取消各种引用,就是当条件队列被移入sync队列后,所有的引用都被清空,好便于垃圾回收
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
条件队列等待方法就是这些,我们继续接着来看入队方法enqueue;
private void enqueue(E x) {
final Object[] items = this.items;
//把元素放入索引为‘放指针’的位置
items[putIndex] = x;
//放索引加一,当如果,指针已经到了数组尾部,则指针又回到数组头部,就是重置为0
if (++putIndex == items.length)
putIndex = 0;
//队列元素长度加一
count++;
//当元素已经放入,则通知非空条件队列,唤醒该队列线程可以
notEmpty.signal();
}
我们继续来看下signal通知方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取到条件队列的第一个节点,开始唤醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//遍历整个条件队列,修改状态,然后入sync队列,直到没有下一个节点为止
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//使用cas修改waitStatus状态,从CONDITION,设置成初始状态,这个地方可能会和await方法发生竞争,但是一般不会
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//修改后,就把该节点移入到sync队列中
Node p = enq(node);
int ws = p.waitStatus;
//该判断是在该线程被中断的情况下,修改后,会唤醒下个节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
put方法的整个流程我们就分析完了,我们再来分析下,take方法的dequeue,出队方法
private E dequeue() {
final Object[] items = this.items;
//获取数组下标为【取数据指针】 位置的数据
E x = (E) items[takeIndex];
//再把当前位置置位空,则出队完
items[takeIndex] = null;
//如果当前指针位置已经到了,数组尾,则需要跳到数组头,就相当一个圆
if (++takeIndex == items.length)
takeIndex = 0;
//取出值后,再把队列长度减一
count--;
if (itrs != null)
itrs.elementDequeued();
//当元素出队后,通知未满条件队列,唤醒线程,signal方法已经分析过了,这里可以看下上面的逻辑,联系起来
notFull.signal();
return x;
}
四、总结
该队列使用的是数组来存放数据,如果当数据放满后,使用put方法则,阻塞住该线程,把该线程移入到未满条件队列中,等待取数据后被唤醒;如果取数据时,队列为空,则把该线程放入非空条件队列,等待放数据后被唤醒;因为使用的是数组,用写指针和读指针来标记位置,如果到了队尾,则会重新跳到队头;相当于是一个环;
网友评论