关于AQS条件队列与同步队列分析
1 概述
本文主要演示一下Condition中同步队列和条件队列是如何交互的。
1.1 交互流程
image.png图 1 条件同步队列(图片来源网上)
2源码分析(ConditionObject)
2.1 条件队列等待await
1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。
2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。
3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。
4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。
2.1.1 await源码分析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //创建条件节点并加到条件队列
int savedState = fullyRelease(node); // 释放锁,返回旧值
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 不在同步队列里等待signal
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 清除中断标志位
break;
}
// 被唤醒了,将节点移动到同步队列里面,自旋并阻塞等待信号unpark
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public ReentrantLock() {
sync = new NonfairSync();
}
2.2 条件队列通知signal
2.2.1 signal源码分析
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
first.nextWaiter = null;
} while (!transferForSignal(first) && // 加到同步队列,然后unpark
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
//ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒node对应的线程
return true; // 通知结束,
}
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。
对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒(什么时候,unlock时)。
2.3 细节分析
2.3.1 waitStatus几种值
CANCELED:1
0
SIGNAL:-1
CONDITION:-2
PROPAGATE:-3
2.3.2在同步队列的判断条件
参数node waitStatus==-2或者前置节点null,不在,否则下一步
Next != null,在,否则下一步
从同步队列尾部查找node
2.3.3条件队列转入同步队列
同步队列的初始化节点:initNode(默认waitStatus:0)
取出条件队列中的第一个等待节点(waitStatus:-2),cas操作将waitStatus修改为:0,然后如同步队列。
取同步队列的前置节点。
如果waitStatus>0(实际是CANCELED),或者 CAS 失败,会进到这里唤醒线程,直接唤醒当前node节点(否则等待lock.unlock操作)
2.3.4时序(重点)
这里:await LockSupport.park(this);t2:doSignal时,还是park在此
直到t2:lock.unlock()才能被唤醒,在同步队列中自旋。
3代码示例(ConditionObject)
3.1 等待线程
package com.kikop.demo.MyAqsConditionQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: javainaction
* @file Name: MyConditionAwaitRunnableTask
* @desc 功能描述
* @date 2020/6/13
* @time 22:41
* @by IDE: IntelliJ IDEA
*/
public class MyConditionAwaitRunnableTask implements Runnable {
private ReentrantLock reentrantLock;
private Condition condition;
public MyConditionAwaitRunnableTask(ReentrantLock reentrantLock, Condition condition) {
this.reentrantLock = reentrantLock;
this.condition = condition;
}
@Override
public void run() {
try {
reentrantLock.lock(); // 当前t2节点加到同步队列
System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");
System.out.println(Thread.currentThread().getName() + "等待信号");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到信号");
} finally {
reentrantLock.unlock();
System.out.println(Thread.currentThread().getName() + "unlock结束");
}
}
}
3.2 唤醒线程
package com.kikop.demo.MyAqsConditionQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: javainaction
* @file Name: MyConditionAwaitRunnableTask
* @desc 功能描述
* @date 2020/6/13
* @time 22:41
* @by IDE: IntelliJ IDEA
*/
public class MyConditionSignalRunnableTask implements Runnable {
private ReentrantLock reentrantLock;
private Condition condition;
public MyConditionSignalRunnableTask(ReentrantLock reentrantLock, Condition condition) {
this.reentrantLock = reentrantLock;
this.condition = condition;
}
@Override
public void run() {
try {
reentrantLock.lock(); // 当前t2节点加到同步队列
System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "发出信号");
condition.signal(); // 通知条件队列中第一个节点,此时条件队列-->加到同步队列,并仍然阻塞
} finally {
reentrantLock.unlock(); // 从同步队列中头节点开始找,并unpark
System.out.println(Thread.currentThread().getName() + "unlock结束");
}
}
}
3.3 测试
package com.kikop.demo.MyAqsConditionQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: javainaction
* @file Name: AqsConditionQueueTest
* @desc 功能描述
* @date 2020/6/13
* @time 22:06
* @by IDE: IntelliJ IDEA
*/
public class AqsConditionQueueTest {
public static void test() {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
Thread t1 = new Thread(new MyConditionAwaitRunnableTask(reentrantLock, condition), "thread1");
Thread t2 = new Thread(new MyConditionSignalRunnableTask(reentrantLock, condition), "thread2");
t1.start();
t2.start();
}
public static void main(String[] args) {
test();
}
}
thread1lock,拿到锁了step1
thread1等待信号 step2
thread2lock,拿到锁了 step3
thread2发出信号 step4
thread2unlock结束(等待中) step5
thread1拿到信号 step6
thread1unlock结束 step7
3.4图解步骤
image.png图 2 t1由于不在同步队列找中,Await一直park趴在这儿(unlock也不会执行)
image.png图 3 t2发出信号
image.png图 4 t2 unlock
image.png图 5 进入同步队列中自旋
image.png图 6移动同步队列的头结点指针,当前node作为头结点,前置节点位置null,让gc回收(首次即为始化的initNode
image.png图 7 t1执行业务逻辑
image.png图 8 t1释放当前节点,unpark后置等待节点,整个流程结束
网友评论