一、前言
Jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。
与wait/notify区别
-
1.Condition能够支持不响应中断,而通过使用Object方式不支持。
-
2.Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个。
-
3.Condition能够支持超时时间的设置,而Object不支持。
二、Condition实现生产者和消费者模式
为了方便理解,我们先写一个用condition实现的生产者消费者的例子。
/**
* @Description: 演示Condition实现生产者和消费者模式
*/
public class ConditionDemo2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
new Thread(producer).start();
new Thread(consumer).start();
}
class Consumer implements Runnable{
@Override
public void run() {
consume();
}
public void consume(){
while(true){
lock.lock();
try{
while (queue.size() == 0){
System.out.println("队列空,等待数据");
notEmpty.await();
}
Integer poll = queue.poll();//走过await()证明队列不为空,取出数据
System.out.println("消费者消费数据:"+poll+",队列剩余数据数量:"+queue.size());
notFull.signalAll();//获取数据之后,队列肯定有空闲,那么唤醒生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Producer implements Runnable{
@Override
public void run() {
produce();
}
public void produce(){
while(true){
lock.lock();
try{
while (queue.size() == queueSize){
System.out.println("队列已满,等待空余");
notFull.await();
}
queue.offer(1);//走过await()证明队列有空闲,开始往队列里生产数据
System.out.println("生产者向队列生产一个数据,队列剩余空间:"+(queueSize-queue.size()));
notEmpty.signalAll();//向队列生产数据之后,队列不为空,那么唤醒消费者进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
Condition注意点
- 实际上,如果说Lock用来替代synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎一样。
- await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁。
- 调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样。
三、原理分析
在AQS中存在两个FIFO队列:同步队列(等待队列)和条件队列。
同步队列(等待队列):ReentrantLock实现原理
本文主要是讲condition实现原理(即条件队列),条件队列是由Condition内部实现的,是一个虚拟的FIFO单向队列,在AQS中同步队列、等待队列组成关系:
-
1、AQS中tail 和 head主要构成了一个FIFO双向的同步队列。
-
2、AQS中condition构成了一个FIFO单向条件队列。condition是AQS内部类,每个Condition对象中保存了firstWaiter和lastWaiter作为队列首节点和尾节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列。
3.1 队列关系
在Object的监视器(monitor)模型上,一个对象拥有一个同步队列和一个等待队列;而并发包中的AQS上拥有一个同步队列和多个等待队列。两者的具体实现原理的有所不同,但在多线程下等待/唤醒 操作的思路有相同之处,Object的监视器模型 和 AQS对同步队列、等待队列对应关系如下图
3.1.1、Object的监视器模型同步、等待队列对应关系图
3.1.2、AQS中同步队列、条件队列对应关系图
当多线程并发访问AQS的lock()、await()、single()方法时,同步队列和等待队列变化处理过程包括:
- 1、多个线程执行lock()方法时,线程会竞争获取同步锁state,获取成功的线程占有锁state、获取失败的线程会封装成node加入到AQS的同步队列中,等待锁state的释放。
- 2、等获取了state锁的线程(同步队列中head节点)执行await()方法时,condition会将当前线程封装成一个新的node添加到condition等待队列的尾部,同时阻塞(waiting),直到被唤醒。
- 3、等获取了state锁的线程(同步队列中head节点)single()方法时,condition会将等待队列首节点移动到同步队列的尾部,直到获取同步锁state才被唤醒。
3.2 Condition的实现
3.2.1 等待的实现
当线程调用Condition.await()方法时,将会把前线程封装成node节点,并将节点加入等待队列的尾部,然后释放同步state状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当前线程加入Condition的等待队列逻辑如下图:
-
1、能够调用Condition.await()方法的节点是获取了同步state锁的node,即同步队列中的head节点;调用Condition的await()方法(或者以await开头的方法)会使当前线程进入等待队列并释放锁、唤醒同步队列中的后继节点,最后线程状态变为等待状态。
-
2、Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。
-
3、调用Condition.await()节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了state锁的线程,也就是说该过程是由锁来保证线程安全的。
3.2.2 通知的实现
整个signal()的过程可以总结如下:
-
1、执行signal()唤醒线程时,先判断当前线程是否是同步锁state持有线程,所以能够调用signal()方法的线程一定持有了同步锁state。
-
2、自旋唤醒等待队列的firstWaiter(首节点),在唤醒firstWaiter节点之前,会将等待队列首节点移到同步队列中。
四、源码分析
可以看到,想要获得一个condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。
Condition condition = lock.newCondition();
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
}
}
condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的条件队列,所有调用condition.await方法的线程会加入到条件队列中,并且线程状态转换为等待状态。
ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,条件队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点。
用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。
4.1 await等待
只有线程获取到lock之后,才可以使用condition的await方法。假设此时线程1获取到了ReentrantLock锁,在执行代码逻辑的时候,发现某些条件不符合,于是调用了condition.await();代码:
此时AQS主要执行以下动作:
- 线程1把自己包装成节点,waitStatus设为CONDITION(-2),追加到ConditionObject中的条件队列(每个ConditionObject有一个自己的条件队列);
- 线程1释放锁,把state设置为0;
- 然后唤醒等待队列中head节点的下一个节点;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
//如果当前线程中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//删除无效的等待节点
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
}
保存新节点addConditionWaiter()方法如下。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除被取消的尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
//解除关联
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程保存在Node中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//队尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}
}
}
将当前节点保存到新建立的Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。可以看出等待队列是一个不带头结点的链式队列,而AQS中的同步队列是一个带头结点的链式队列。
将当前节点插入到等待对列之后,会调用fullyRelease,使当前线程释放lock。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
}
方法内部调用AQS的模板方法release方法释放AQS的同步状态,并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
//......
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//......
}
}
}
当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,有两种可能:
- 逻辑走到break退出while循环(当前等待的线程被中断)
- while循环中的逻辑判断为false(当前节点被移动到了同步队列中,即另外线程调用的condition的signal或者signalAll方法)。
总的说就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState)(之前Reentlock中讲过),自旋过程中线程不断尝试获取同步状态,直至获取lock成功。这也说明了退出await方法必须是已经获得了condition关联的lock。
4.2 signal唤醒
当另一个线程执行了 condition.signal之后,主要是做了以下事情:
- 1、把条件队列中的第一个节点追加到等待队列中;
- 2、把等待队列原来尾节点的waitStatus设置为SIGNAL。
然后继续处理自己的事情,自己的事情处理完成之后,会释放锁,唤醒等待队列中head节点的下一个节点线程进行工作。
调用condition的signal唤醒一个等待在condition上的线程(头节点),将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回,源码如下。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取条件队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
}
}
signal方法首先会检测当前线程是否已经获取lock,没有获取lock会直接抛出异常,再调用doSignal传入头节点。doSignal方法源码为:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
private void doSignal(Node first) {
do {
// 已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将头结点从条件队列中移除
first.nextWaiter = null;
// while中transferForSignal方法对头结点做真正的处理
// 将等待队列中的 Node 转移至 AQS 同步队列, 不成功且还有节点则继续循环
} while (!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null);
}
}
}
具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal,该方法源码为:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// 更新状态为0
// 如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将该节点移入到AQS同步队列尾部
Node p = enq(node);
int ws = p.waitStatus;
// 上一个节点被取消
// 上一个节点不能设置状态为 Node.SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
return true;
}
}
4.3 signalAll()源码
signalAll()会从首节点循环遍历条件队列,将条件队列中的所有节点移到同步队列中去。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void signalAll() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
//遍历条件队列,将条件队列中的node移动到同步队列中
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
//移动节点到同步队列中
transferForSignal(first);
first = next;
} while (first != null);
}
}
}
4.4 不可打断等待 - 直到被唤醒
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
}
}
4.5 等待 - 直到被唤醒或打断或超时
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个 Node 至等待队列,
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
// 获得最后期限
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超时, 退出等待队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
}
}
4.6 await恢复后继续执行
被唤醒的如果是之前执行了await方法的线程,那么该线程会接着就像往await方法里面阻塞处的下面继续执行,下面是源码:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
//如果当前线程中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//删除无效的等待节点
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
}
可以发现,这里主要是判断到当前线程节点已经放入等待队列了,那么会尝试获取锁,获取成功则继续往下执行代码。
只有线程获取到ReentrantLock的锁之后才可以继续往下执行,中间可能会因为执行await而进入条件队列并释放锁,最后又会被唤醒重新获取锁,继续往下执行。最后按照书写规范,我们一定会在代码中执行ReentrantLock.unlock()释放锁,然后继续唤醒等待队列后续线程继续执行。
总结
-
1、Condition等待通知的本质就是条件队列 和 同步队列的交互的过程,跟object的wait()/notify()机制一样;Condition是基于同步锁state实现的,而objec是基于monitor模式实现的。
-
2、一个lock(AQS)可以有多个Condition,即多个条件队列,只有一个同步队列。
-
3、Condition.await()方法执行时,会将同步队列里的head锁释放掉,把线程封装成新node添加到条件队列中;Condition.signal()方法执行时,会把条件队列中的首节点移到同步队列中去,直到锁state被获取才被唤醒。
参考:
https://www.itzhai.com/articles/analysis-of-reentrantlocks-condition-principle.html
https://blog.csdn.net/e891377/article/details/104715461
https://blog.csdn.net/weixin_42103620/article/details/117331593
https://blog.csdn.net/sinat_32873711/article/details/106619981
网友评论