零:序言
使用过ReentrantLock
的盆友应该也知道Condition
的存在。先讲解下它存在的意义:就是仿照实现Object
类的wait
signal
signallAll
等函数功能的。
这里引申一个面试常问到的问题:wait
会释放锁,sleep
不会。
-
Condition
的通常使用场景是这样的:
生产者消费者模型,假设生产者只有在生产队列为空时才进行生产,则代码类似如下:
Condition emptyCondition = ReentrantLock.newCondition();
Runnable consumer = new Runnable() {
public void run() {
if(queue.isEmpty()) {
emptyCondition.signal(); // emptyObj.notify();
} else {
consumer.consume();
}
}
}
Runnable provider = new Runnable() {
public void run() {
emptyCondition.wait(); // emptyObj.wait();
providerInstance.produce();
}
}
所以我们可以知道Condition
设计的意义了。下面我们来讲解下其实现原理。
一:实现概况
还记得在AQS:JAVA经典之锁实现算法(一)提到的锁实现的Sync Queue
吗?
Condition
的实现是类似的原理:
每个AQS
里有x(视你newCondition
几次)个Condition Queue
,它的结点类也是AQS
内部类Node
。Node
里有一个nextWaiter
,指向下一个在同一Condition Queue
里的Node
。
结构如下图:
- 首先明确下是,
condition.wait
一定是在成功lock
的线程里调用才有效,不然不符合逻辑,同时也会抛出IlleagleMornitorException
。 - 获取锁的线程处于
Sync Queue
的队首,当调用condition.wait
时,该线程会释放锁(即将AQS
的state
置为0),同时唤醒后继结点,后继结点在acquire
的循环里会成功获取锁,然后将自己所在结点置为队首,然后开始自己线程自己的业务代码。
这个过程看下图:
wait状态图_1
- 当waiter_1收到相应
condition
的signal
后,在Condition Queue
中的Node
会从Condition Queue
中出队,进入Sync Queue
队列,开始它的锁竞争的过程。
过程看下图:
所以,这里可以看出来,即使是被signal
了,被signal
的线程也不是直接就开始跑,而是再次进入Sync Queue
开始竞争锁而已。这里的这个逻辑,跟Object.wait Object.signal
也是完全一样的。
二:代码实现原理
我们先看一段运用到condition
的代码案例:
假设生成者在生产队列queue
为空时emptyCondition.signal
才进行生产操作
ReentrantLock locker = new ReentrantLock();
Condition emptyCondition = locker.newCondition();
Runnable consumer = new Runnable() {
public void run() {
locker.lock();
if (queue.isEmpty()) {
emptyCondition.signal();
} else {
...
}
locker.unlock();
}
};
Runnable producer = new Runnable() {
public void run() {
locker.lock();
emptyCondition.wait();
// 开始生产
...
locker.unlock();
}
}
我们从消费者一步一步走,拟定如下这样一套线程切换逻辑:
producer#lock
consumer#lock
producer#await
consumer#signal
consumer#unlock
producer#unlock
(先从Sync Queue Condition Queue
图解讲一遍,然后对应图解,对着代码撸一遍)
-
producer#lock
生产者直接获取锁成功,入队Sync Queue
,位队首
consumer#lock
消费者竞争锁失败,进入Sync Queue
等待获取锁
-
producer#await
生产者进入等待,释放锁,出Sync Queue
,进入Condition Queue
,等待emptyCondition
来唤醒。
-
consumer#signal
消费者唤起生产者,生产者consumer
的node
自Condition Queue
转移到Sync Queue
开始竞争锁。
-
consumer.unlock
consumer
释放锁后,consumer
的node
从Sync Queue
出队,释放state
,唤醒后继结点provider#node
,provider
抢占到锁。
-
provider#unlock
这里就没有啥好说的了。
当然,我为了讲解过程,像在锁被第一次成功获取的时候,逻辑上虽然并不是直接进入
Sync Queue
我也给讲解成直接进入Sync Queue
了,这是为了缩减边边角角的小逻辑,讲清楚主线逻辑。大家看明白主逻辑,然后再自己去撸一遍,就融会贯通了。
三:代码撸一把
-
provider.lock
final void lock() {
// 这就直接获取锁成功了,没有else的逻辑了
if (compareAndSetState(0, 1))
// 这个方法是AQS类用来设置拥有锁的线程实例
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
-
consumer#lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// consumer.lock就要走这里了,因为上面的compareAndSetState
// 返回false
else
acquire(1);
}
protected final boolean compareAndSetState(int expect, int update) {
// 楼下这个是CAS原理进行值修改,CAS就对比乐观锁来,
// 这里想要修改this这个对象的state字段,如果state是expect
// 则修改至update,返回true;否则false。我们知道provider.lock
// 已经将state 改为非0值了,所以这里肯定失败啦
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
-
provider#await
先简单看下Condition
类对象结构
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
...
一个Condition
对象就是一条链队,头尾结点在Condition
的内部字段指定firstWaiter lastWaiter
。
看await
方法
public final void await() throws InterruptedException {
// 因为await是响应中断的等待,这里就是检验下,
// 通常而言,凡是throws InterruptedException的,
// 开头基本都是这句
if (Thread.interrupted())
throw new InterruptedException();
// 这里是向condition queue中插入一个node,并返回之,
// 插入了这个node,就代表当前线程在condition queue
// 中开始等待了
Node node = addConditionWaiter();
// 这个是AQS释放锁方法,加个fully,就是用来将多次
// 获取锁一次性都释放掉,然后将锁获取次数返回,
// 留着后面signal后成功获取锁的时候,还要加锁同样的
// 次数。
// !!!同时注意,这里唤醒了后继结点!后集结点就继续开始
// 竞争锁,就是在acquire那个自旋方法里,记得吗
// 不记得去看看文章(一)
int savedState = fullyRelease(node);
// 记录当前线程中断的标记
int interruptMode = 0;
// 判断当前的node是否已经转移到sync queue里了。
// 转移了,说明这个node已经开始竞争锁了,不用再等待
// 唤醒了,没转,继续自旋
while (!isOnSyncQueue(node)) {
// 这里把当前线程给挂起了
LockSupport.park(this);
// 这里的方法checkxxx就是用来检查waiting自旋期间,线程有没有
// interrupt掉。因为await方法是响应线程中断的。
// 若interrupt了,则在checkxxx方法里,会将node转移到
// sync Queue中,去竞争,不要担心,因为同时
// 会设置interruptMode,在最后会根据其值抛Interrupted
// 异常。。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 那什么时候就结束上面的自旋呢?一个是当前的线程被
// signal了,那node就被transfer到sync queue了,while
// 就不满足了。再一个就是线程中断了,在while循环体里给break掉了
}
// 跳出来后,紧接着去竞争锁,知道成功为止。&& 后面这个THROW_IE,标识
// 要抛出异常,不是的话,就是REINTERRPUT,代表保证线程的中断标记不被
// 重置即可。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 这儿是在condition queue里有多个waiter的时候才起作用,主要用来将
// CANCEL的结点从链队中剔除掉
// 具体大家自己看吧。现在忽略这
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 这儿就是处理interruptMode中断标记字段的逻辑
// 在reportxxx中,interruptMode为THROW_IE,则抛出
// 异常,不是,则保证线程的中断field不被重置为“未中断”即可
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
-
consumer#signal
consumer
在调用emptyCondition.signal
的时候,会影响到emptyCondition
的condition queue
中的等待线程,这里
具体指上面的provider#await方法。
public final void signal() {
// 先判断下,lock锁是不是在调用signal方法的当前线程手里
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 取到condition queue里的第一个waiter node,这里也就是
// consumer,因为它第一个await进入condition queue了
Node first = firstWaiter;
// 这里去进行了具体的signal操作,具体会做先把waiter node的waitStatus
// 从CONDITION状态改为入Sync Queue的正常状态值0
// 然后修改Sync Queue 的Head Tail等,让其入队成功
// 最后再从其前驱结点的状态值上确保当前结点能够被唤起即可。
// 这里是因为这个waitStatus值对后继结点的行为是有影响的,像SIGNAL指
// 的是在结点释放后,要去唤醒后继结点
//
if (first != null)
doSignal(first);
}
-
consumer#unlock
unlock
具体调用的 AQS
的release()
方法
public void unlock() {
sync.release(1);
}
// AQS.release
public final boolean release(int arg) {
// tryRelease,这里由NonFairSync实现,具体就是通过
// CAS去修改state值,并判断是否成功释放锁
if (tryRelease(arg)) {
// 成功释放了,则在waitStatus 不是初始状态时,去唤醒后继,
// 这个 != 0 来做判断的原因,就要综合所有情况,
// 像FailSync NonFairSync \ Exclusive \ Share
// 等所有情况来看这里的waitSTatus都会处于什么状态。
// 全撸一遍的话,会发现这里的 != 0能够涵盖以上所有情况。
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
-
provider#unlock
这里就同理上面了。
总结
总体来看两个 queue
的转换还是挺清楚的。只要记住,不管什么情况(中断与否),都是要从condition queue
转移到sync queue
的。具体大家还是要自己去想一种线程切换场景,去走走看。
行文匆匆, 欢迎指正。
网友评论