Condition核心原理分析
java内置锁的wait/notify
实现了等待唤醒机制,那么实现自定义的锁时自然是少不了同样的功能,那么借助AQS
实现自定义锁时该如何做呢,或者JUC提供了什么样的机制。
那Condition
就是实现同步等待通知的最佳利器了。看看具体是如何实现的。
public interface Condition {
void await();
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout);
boolean await(long time, TimeUnit unit);
boolean awaitUntil(Date deadline);
void signal();
void signalAll();
}
方法名 | 描叙 |
---|---|
void await() | 进入等待,直到被唤醒 |
void awaitUninterruptibly() | 进入等待,知道被唤醒,但可以被中断 |
long awaitNanos(long nanosTimeout) | 进入等待,超时或者被唤醒(默认单位为纳秒) |
boolean await(long time, TimeUnit unit) | 进入等待,超时或者被唤醒 |
boolean awaitUntil(Date deadline) | 进入等待,如果到达最后期限或者被唤醒或中断则返回 |
void signal() | 唤醒一个等待的线程 |
void signalAll() | 唤醒所有等待的线程 |
代码一目了然,无非就是await、signal
,对应的就是wait、notify
;
直接写一个demo,看看效果
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
AtomicInteger target = new AtomicInteger(1);
new Thread(() -> {
try {
lock.lock();
while(true){
if(target.get() % 2 != 0){
System.out.println(Thread.currentThread().getName() + "执行,target:" + target.getAndIncrement());
TimeUnit.SECONDS.sleep(1);
}else{
conditionA.signalAll();
System.out.println(Thread.currentThread().getName() + "进入等待");
conditionA.await();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
},"线程1").start();
new Thread(() -> run(lock, conditionA, target, 2),"线程2").start();
new Thread(() -> run(lock, conditionA, target, 3),"线程3").start();
}
private static void run(Lock lock, Condition conditionA, AtomicInteger target, int i) {
try {
lock.lock();
while (true) {
if (target.get() % i == 0) {
log.log(Thread.currentThread().getName() + "执行,target:" + target.getAndIncrement());
TimeUnit.SECONDS.sleep(1);
} else {
conditionA.signalAll();
System.out.println(Thread.currentThread().getName() + "进入等待");
conditionA.await();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
分析下执行结果
线程1执行,target:1
线程1进入等待
线程2执行,target:2
线程2进入等待
线程3执行,target:3
线程3进入等待
线程1进入等待
线程2执行,target:4
线程2进入等待
线程3进入等待
线程1执行,target:5
线程1进入等待
线程2执行,target:6
线程2进入等待
线程3进入等待
线程1执行,target:7
线程1进入等待
线程2执行,target:8
线程2进入等待
线程3执行,target:9
1)线程1*在 *target != 2
时获取锁,否则进行等待并唤醒所有其他线程,当target==1
时,线程1
获取锁,执行输出,担当target增长为2时,线程1进入等待,释放锁,并唤醒线程2和线程3
2)线程2
在 target % 2==0
时获取锁,否则进行等待并唤醒所有其他线程,当target%2==0时,线程2获取锁,执行输出,担当target增长为3时,线程2进入等待,释放锁,并唤醒线程1和线程3
3)线程3
在 target % 3==0
时获取锁,否则进行等待并唤醒所有其他线程,当target%3==0
时,线程3
获取锁,执行输出,担当target增长为4
时,线程3
进入等待,释放锁,并唤醒线程1
和线程2
。
依照上面的demo,那么接下来我们一步步具体看看我们使用到的实现类:AbstractQueuedSynchronizer#ConditionObject
1.定义Condition
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
// ... 进入 ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
// ... 进入 ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
// ... 进入 AbstractQueuedSynchronizer
public ConditionObject() { }
public class ConditionObject implements Condition, java.io.Serializable{ ... }
可以看到 lock.newCondition()
最终是通过sync.newCondition();
进行构造,通过代码知道sync就是AbstractQueuedSynchronizer的实现类(即自定义的队列同步器
)。由此可以看出,Condition其实是由AbstractQueuedSynchronizer的实现类new出来的,可以看到最终的实现类是AbstractQueuedSynchronizer#ConditionObject
,ConditionObject实现至Condition,因此必然实现了具体的等待唤醒机制,那么接着结合demo进行进一步拆解。
简单用图示表示就是如下结构:
Condition这里有一点很重要,那就是通过指定ReentrantLock对象来new出Condition,可以让该Condition指向了一把具体的锁,那么线程调用该Condition进行await或者signal时,就明确知道操作的是哪把锁,因此就能与其他竞争该锁的线程进行通信了。
那么分析一下这个<u>ConditionObject</u>对于等待唤醒机制是如何做的。
先记住一点:一个condition也就是一个队列,就是存放竞争该condition而导致等待线程的队列
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter; // 头节点
private transient Node lastWaiter; // 尾节点
...
}
不错,ConditionObject是AQS的内部类,它内部维护了头尾节点,节点也是通过AQS中定义的Node构造而成,因此形成了一个同步队列。通过前面AQS的分析,Node中维护了线程及前后节点的指针,因此就很好理解,await就是将线程构造成Node加入队列,signal就是唤醒队列中指定的节点中的线程
但是这里有一点与AQS中的node的区别一定要注意(上图中也明确表示):AQS中的队列时同步双向队列,通过next及pre维护前后节点;但是Condition中的队列时单向等待队列,通过nextWaiter维护下一个节点;这也是在前面介绍AQS中队列节点时,nextWaiter是空的原因。
2.核心原理分析
public final void await() throws InterruptedException {
// 如果线程被中断,异常出去
if (Thread.interrupted()) throw new InterruptedException();
// 根据当前线程构造node,并加入到等待队列
Node node = addConditionWaiter();
// 遍历等待队列,尝试释放锁,该逻辑在AQS中有讲
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断是否是同步队列(这里主要是与AQS中的同步队列作比较)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试再次获取同步状态,若成功则退出等待时重新中断,(返回true代表没有获取成功)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT; // 退出等待时重新中断
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 清空waitStatus != -2 的节点(waitStatus>0代表节点已经取消了)
if (interruptMode != 0)
// 进入等待吧
reportInterruptAfterWait(interruptMode);
}
这里的逻辑主要就是将当前调用await()
的线程包装成node
节点,并加入到等待队列中去,当然在真正加入队列前还进行了一次尝试。同时设置waitStatus=-2
,表示当前节点在等待队列中。
// 唤醒
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 找到等待队列中的头节点
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 一直遍历队列中的节点
Node next = first.nextWaiter;
first.nextWaiter = null;
// 唤醒
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
// 状态校验
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 执行唤醒
LockSupport.unpark(node.thread);
return true;
}
上面逻辑也很简单,无非就是遍历等待队列中的所有节点,进行一定的状态校验后,进行唤醒操作。
-
2.1 按照demo中的逻辑,首先
线程1(T1)、线程2(T2)、线程3(T3)
同时竞争资源(AQS),因为原子变量target初始值是1
,因此此刻只有T1获取到锁,那么刚开始队列是这样的:
- 2.2 T2、T3尝试执行时,发现条件不满足,因此调用await()进入等待队列,此刻队列是这样的:
-
2.3 随着
target递增为2
,此时T1判断条件不满足,调用await()进入等待队列,同时signalAll()
唤醒所有在等待队列中的T2、T3
,,同时T2
根据条件成功获取到锁,此刻队列时这样的:
- 2.4 同时由于T3条件不满足,自身调用await()进入等待,同时唤醒所有等待的线程(这个过程比较快):
-
2.5 随着
target递增为23
,此时T2判断条件不满足,调用await()进入等待队列,同时signalAll()
唤醒所有在等待队列中的T3
,,同时T3
根据条件成功获取到锁,此刻队列时这样的:
- 2.6 随着原子变量target的递增,反复上叙同步队列与等待队列的来回操作,就实现了线程间的等待与唤醒。
以上就是Condition的实现机制,主要是借助Node维护一个单向队列,实现线程的等待与唤醒。这里一定要区分等待队列与同步队列
的区别,等待队列时Condition维护的,主要是实现显式锁的释放与唤醒,而同步队列时AQS维护,主要是实现资源竞争的同步等待。
其实后面介绍ReentrantLock的时候。就比较简单了,同时AQS、Condition、ReentrantLock在后面要讲的同步阻塞队列中使用的非常多,可以特别关注下。
网友评论