一、ConditionObject数据结构
简单回顾条件队列的数据结构,一个单链表。
// 条件队列所关心的Node结构
static final class Node {
static final int CANCELLED = 1; //线程已取消
static final int CONDITION = -2; //处于条件队列
volatile int waitStatus;
volatile Thread thread; //使该节点入队的线程。使用后为null
//如果在条件队列(只有在独占模式下才能访问)是一个单向链表,nextWaiter是单向链表的next指针。
Node nextWaiter;
Node(Thread thread, int waitStatus) { // 给Condition对象使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
//条件队列Condition数据结构
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1; // 线程被唤醒后产生中断,不抛异常
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1; // 线程在阻塞时被中断唤醒,抛异常
}
- 条件队列只有在独占模式下才能访问
- 在条件队列只有CONDITION和CANCELLED状态,node初始状态是CONDITION
- 条件队列使用的链表指针是nextWaiter字段
下面重点讲await等待方法和signal方法。。。
二、await可中断条件等待
线程持有独占锁才可以调用该方法,当线程进入同步队列后调用acquireQueued方法参与锁的竞争。
public final void await() throws InterruptedException { //调用这个方法的线程肯定是持有锁的
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //将当前线程加入当前Condition等待队列,顺便清除等待队列清除中所有状态不是CONDITION的node
int savedState = fullyRelease(node); //释放锁,并返回原本锁的state状态值
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//node进入同步队列后退出自旋,其他线程唤醒当前线程后会将当前线程加入同步队列
LockSupport.park(this);//阻塞当前线程,条件队列的线程await时都会被阻塞在这里等待其他线程唤醒或者中断唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;//不等于0表示中断,中断即退出
}//退出时,node已经处于同步队列,状态也不是CONDITION
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); //执行到这里一定是拿到锁了,清理等待队列中的已取消的node
if (interruptMode != 0)//拿到锁后才可以执行中断逻辑
reportInterruptAfterWait(interruptMode); //检查并执行抛异常,或者设置中断状态
}
- 条件队列中阻塞的线程可以被【中断唤醒】或者【signal方法唤醒】,有可能同时发生,问题在于怎么判定两者谁先发生
- 线程被唤醒后,都会调用checkInterruptWhileWaiting方法,来检查和处理【中断唤醒】
- 该方法内部会调用transferAfterCancelledWait方法,该方法根据CAS结果判断是否【中断唤醒】
- CAS成功,那么认为线程是【中断唤醒】,线程需要将自己放到同步队列
- CAS失败,说明signal同时发生且没有争过signal,那么认为线程是【signal方法唤醒】,线程需等待signal将线程放到同步队列后再返回
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
三、signal条件唤醒
将等待时间最长的线程(如果存在)从该条件的等待队列移动到拥有锁的等待队列
public final void signal() {
if (!isHeldExclusively()) //检查当前线程是否独占
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;//出条件队列,nextWaiter设置为null
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); //成功唤醒一个等待者或者没有等待者就会退出循环
}
直接看transferForSignal内部实现:
final boolean transferForSignal(Node node) {//将节点从条件队列转移到同步队列。如果成功则返回真,
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //可能与transferAfterCancelledWait竞争
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //入队
int ws = p.waitStatus; //原尾节点的状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //如果前驱节点被取消,或者修改前驱节点状态失败,直接唤醒线程
return true;
}
- 该方法会与上文的transferAfterCancelledWait方法竞争
- transferAfterCancelledWait与该方法的方法名都以transfer开头,都有转移节点的意思
- 如果transferForSignal是给Signal调用的,那么transferAfterCancelledWait方法意思就是给取消等待的线程调用的
- CAS竞争失败,直接返回唤醒节点失败
- CAS竞争成功,将线程入同步队列
- 如果前驱节点被取消,或者未把前驱节点状态设置为SIGNAL,说明node不能安全阻塞,需要直接唤醒
网友评论