代码演示:
public class ConditionTest implements Runnable {
private final static Lock lock = new ReentrantLock();
private final static Condition CONDITION = lock.newCondition();
public void test1() {
lock.lock();
try {
if (Thread.currentThread().getName().equals("t1")) {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "释放锁,等待通知");
CONDITION.await();
System.out.println(Thread.currentThread().getName() + "重新获得锁");
System.out.println(Thread.currentThread().getName() + "执行结束");
} else if (Thread.currentThread().getName().equals("t2")){
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "发送通知");
CONDITION.signal();
System.out.println(Thread.currentThread().getName() + "执行结束,释放锁");
}
} catch (Exception e) {
System.out.println(e);
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionTest test = new ConditionTest();
Thread t1 = new Thread(test, "t1");
Thread t2 = new Thread(test, "t2");
t1.start();
Thread.sleep(300);
t2.start();
}
@Override
public void run() {
test1();
}
}
image.png
Condition作用原理
Condition内部是AQS的内部类利用条件队列实现阻塞和通知线程的效果。当一个线程在调用了await方法以后会被阻塞,调用signal方法唤醒。这种方式为线程提供了更加简单的等待/通知模式。一个Condition的实例必须与一个Lock绑定,因为Condition是作为AQS的内部类实现的,而Lock内部其实就是使用AQS实现的同步功能,因此Condition作用需要与Lock或者实现了AQS的类配合使用。
- await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
- await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
- awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
- awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态,该方法对中断不敏感。
- awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
- signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
- signalAll() :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
等待:
Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、以当前线程构造成节点并将节点从尾部加入等待队。如果不是通过 其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断interrupt(),则会抛出InterruptedException异常信息。
通知:
调用Condition的signal()方法,将会唤醒在等待队列中首节点,在唤醒节点前,会将节点移到同步队列中,在调用signal()方法之前必须先判断是否获取到了锁。接着获取等待队列的首节点,将其移动到同步队列并且利用LockSupport唤醒节点中的线程
Condition源码分析
await
ConditionObject 是AQS的内部类,实现了Condition接口
await()方法对中断敏感,线程中断标志位为true时调用await()就会抛出异常
//这个ConditionObject 是AQS的内部类
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
//这里表明await()方法对中断敏感,线程中断为true时调用await()就会抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列。
Node node = addConditionWaiter();
//释放node节点线程的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断节点是否在同步队列中,在则使用LockSupport.park将其挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 遍历队列,将状态不为CONDITION的节点剔除出队列
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾节点为空则将节点表明队列为空,将新节点设置为头节点
if (t == null)
firstWaiter = node;
else
//尾节点不为空则将节点表明队列不为空,将新节点设置为尾节点的后续节点
t.nextWaiter = node;
//将新节点设置为尾节点
lastWaiter = node;
return node;
}
遍历队列,将状态不为CONDITION的节点剔除出队列
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
//遍历队列
while (t != null) {
//遍历队列
Node next = t.nextWaiter;
//将状态不为CONDITION的节点清除
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
使用release确保线程释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取到锁代表getState()大于0
int savedState = getState();
//这里会确保线程释放
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
release利用tryRelease先进行释放锁,tryRelease是ReentrantLock继承AQS实现的方法,可以确保线程是获取到锁的,并且进行释放锁,unparkSuccessor主要是利用LockSupport.unpark(s.thread)唤醒线程,这里我之前在AQS的源码分析讲过,不懂的可以去看看
public final boolean release(int arg) {
//释放锁,这个方法是ReentrantLock继承AQS实现的方法
if (tryRelease(arg)) {
Node h = head;
//如果节点状态不是CANCELLED,也就是线程没有被取消,也就是不为0的,就进行唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这个方法主要是确保了当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程,并且进行相应的释放锁
protected final boolean tryRelease(int releases) {
//将线程的state计时器减-1,state为0代表没有线程持有锁,大于0则代表有线程持有锁了
int c = getState() - releases;
//判断是否是当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//将记录持有线程的变量置为空
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
isOnSyncQueue主要用于判断node是否在同步队列中
final boolean isOnSyncQueue(Node node) {
//判断节点的状态,如果状态是CONDITION,说明节点肯定不在同步队列中,同时哪怕同步队列是刚刚初始化的,也会有一个冗余的头节点存在,
//所以节点的前驱节点如果为null,那么节点也肯定不在同步队列中,返回fasle
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//节点的后继节点不为null,说明节点肯定在队列中,返回true,
//这里很重要的一点要明白,prev和next都是针对同步队列的节点
if (node.next != null)
return true;
//调用findNodeFromTail,查找node是否在同步队列中
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
//取得同步队列的队尾元素
Node t = tail;
//无限循环,从队尾元素一直往前找,找到相等的节点就说明节点在队列中,
//node为null了,说明前面已经没有节点可以找了,那就返回false
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
我们看到Condition挂起线程的手段是和AQS一样的,使用的依旧是 LockSupport的park方法,那么我们可以猜到signal使用的肯定是LockSupport的unpark方法
signal
signal的作用就是要将await()中Condition队列中第一个Node唤醒(signalAll唤醒全部Node)唤醒
isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,证明了signal调用是需要在获取锁的情况下,这里是先会判断整个condition队列是否为空,不为空则获取Condition队列中第一个Node进行唤醒
public final void signal() {
//isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,是则返回true,如何当前线程不是获取锁的线程则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取Condition队列中第一个Node
Node first = firstWaiter;
//判断Condition队列是否为空
if (first != null)
doSignal(first);
}
这是ReentrantLock中的实现方法,判断当前线程是否是获得锁的线程,是则返回true
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
这个方法主要逻辑在transferForSignal
private void doSignal(Node first) {
do {
//头节点为空则,队列为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//将头结点从等待队列中移除
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal把头节点扔到同步队列中,然后使用LockSupport.unpark唤醒节点线程,enq(node)在AQS源码分析中讲过过来了,这里省略
final boolean transferForSignal(Node node) {
//通过CAS将状态为CONDITION节点的状态修改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将该节点移入到同步队列中去,p是node的前缀节点
Node p = enq(node);
int ws = p.waitStatus;
//以下情况进行唤醒节点
//1、node的前缀节点状态为0或者节点状态不为零
//2、node的前缀节点状态修改为SIGNAL状态失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
网友评论