AQS深入分析总结
AQS
很久之前便写了这篇文章,一直没有时间发出来,文章如果有写的不好的地方,欢迎大家能够指正,下面开始详细分析介绍,希望大家能够耐心读下去,肯定会受益匪浅的,AQS是Java JUC中最为核心的一个基础组件,所以它的重要性不言而喻。AQS中主要包含了获取和释放的基本方法,它的模式有独占和共享。它的几个比较重要的公共方法,如下所示。
重要的公有方法
// 和获取相关的方法
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 和释放相关的方法
public final boolean release(int arg)
public final boolean releaseShared(int arg)
重要方法分析
Node分析
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 非常重要的一个字段,表示了Node运行时各种各样的状态
volatile int waitStatus;
// 前一个节点
volatile Node prev;
// 后一个节点
volatile Node next;
// node中维护的线程
volatile Thread thread;
Node nextWaiter;
// 当前节点是否是共享的
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取当前node的前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
acquire方法分析
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
源码解析
- 使用acquire方法 传一个入参表示需要获取的锁的个数,然后调用tryAcquire方法尝试去获取锁
- tryAcquire是交给子类去重写的,如果子类获取到了锁,那么tryAcquire会直接返回true,那么此方法到此也就结束了。
- 如果 tryAcquire 方法返回了false, 表示没有获取到锁,那么会接着往下执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先会去执行addWaiter(Node.EXCLUSIVE), 表示将当前线程封装成Node加入到CLH队列中,接下来分析其源码
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
源码解析
- 将当前线程封装成一个Node节点,当前节点的类型是EXCLUSIVE。
- 拿到CLH队列的尾结点,如果尾节点不为空,然后尝试使用CAS的方式 将当前node 连接到 tail的尾部,将当前节点设置为tail, 如果成功了就直接返回node节点就好了
- 如果没有成功,接着去执行enq方法,进行自旋不断的尝试去连接到CLH队列的尾部。
<aside>
💡 为什么 在连接尾结点的时候,首先 将当前node的prev节点指向尾部,而不是使用CAS成功之后再设置prev节点呢?
</aside>
一方面是基于CAS的双向链表插入目前没有完美的解决方案,另一方面这样子做的好处是:
保证每时每刻tail.prev都不会是一个null值,否则如果 node.prev = t
放在下面if的里面,会导致一个瞬间tail.prev = null,这样会使得队列不完整
接下来接着分析enq方法的源码
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 很明显的可以看到,使用了自旋的方式,不断的去进行尝试,将当前节点插入到队列的尾部
- 首先拿到尾结点,如果尾结点为空,那么说明必须进行初始化,head节点同样的肯定也为null, 使用CAS的方式创建Head节点,避免多个线程同时竞争的去创建,然后创建成功之后,将当前head节点赋值给 tail 节点
- 否则tail节点不为空,说明当前head节点也不为空喽,先将当前node节点的前置节点指向尾结点,然后通过CAS的方式设置尾结点,之后再将原先尾结点的后置节点设置为Node, 最后返回原尾结点
当enq方法执行完之后 又回到了addWaiter方法,接着就回到了 acquireQueued 方法的调用,再来分析下 acquireQueued 方法的执行流程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 声明了两个标记,失败标记和中断标记,接下来开始自旋,拿到当前node的前置节点,如果它的前置节点是Head节点,才有资格去 tryAcquire, 如果tryAcquire拿到了锁,就可以去设置新的head了, 将当前节点设置为head节点,注意 head节点不存储线程和前置节点
- 接下来把原head节点的next置为null, 便于GC垃圾回收,failed标记为faal
- 如果当前Node的前置节点不是head节点或者没有拿到锁,那么就进行接下来的判断了,判断是否应该park住,也就是阻塞住,然后如果是,就去park然后等唤醒的时候检查中断标记,如果是已中断,那么就将中断标记置位true,然后下一轮 会接着自旋去尝试获取锁。
- 如果上述都完成或者有异常抛出的时候,会进入到 finally 方法中 判断是否是失败了,如果是,那么当前节点就去取消获取锁。
下面开始分析下 shouldParkAfterFailedAcquire方法做了哪些事儿?首先上源码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法用于判断 当前节点获取锁失败 是否需要去阻塞等待,入参是当前节点的前置节点和 当前节点
- 拿到前置节点的waitStatus, 然后进行判断,如果它的前置节点的waitStatus是 SIGNAL 的,那就直接返回true了,由此可见,当 node 的前置节点的waitStatus是SIGNAL状态的,那就直接park住了
- 如果 waitStatus 大于0 ,那么表示当前节点是CANCEL状态了,那么就直接进行从前往后的遍历 然后如果waitStatus的状态大于0 那就直接忽略掉了,也就是去掉了CLH队列中 waitStatus 为CANCEL 的节点, 然后返回false, 然后等到下一轮自旋,会再次进行判断的。
- 如果 waitStatus 小于等于0,那么就直接把前置节点的waitStatus改为 SIGNAL 状态,然后等到下一轮自旋,尝试获取锁,然后下一轮如果还走到了这个方法里面,这时候 前置节点的 waitStatus 状态就是SIGNAL的了,就直接返回true了
<aside>
💡 上面方法在断连 取消节点的时候,为什么是从前往后遍历的呢?
</aside>
这个其实上面已经给出了答案了,在往CLH队列中添加node的时候,首先将node的前置节点指向尾结点,然后再通过CAS的方式,将尾结点设置成node节点,如果从前往后开始遍历,那么可能会有一瞬间 队列是断的,而从后往前则是完整的一个队列
下面再来分析 parkAndCheckInterrupt 方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这个方法主要干了两件事,只有 shouldParkAfterFailedAcquire方法 返回 true的时候,才会调此方法
- park住当前的线程,也就是阻塞住当前的线程
- 等线程被唤醒的时候,检查当前线程的中断标志
分析完 parkAndCheckInterrupt() 方法其实这个 acquireQueued 方法算是分析完了,还差一个 cancelAcquire 方法接着往下分析, 首先上源码
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
从以上源码看的话,cancelAcquire 方法其实还挺复杂的
如果当前node是null, 那就直接返回了,如果不为null的话,首先先把这个node中封装的线程置为null
- 拿到当前node的前置节点,从后往前进行遍历 跳过waitStatus > 0 取消的节点,直到将 要取消的node的前置节点指向一个非取消状态的node
- 然后把node 之前 非取消节点的 下一个节点拿到,将当前准备要取消的节点状态置为取消
- 如果要取消的节点是尾结点,那肯定要用CAS的方式把当前尾结点之前的非取消节点置为 尾结点,然后如果成功了,要用CAS 把尾结点的Next节点 置为null的
- 否则,如果要取消的那个节点不是尾结点, 然后就根据条件进行判断是否是将前后节点连接起来还是直接唤醒后置节点
- 如果要取消的这个node的前置节点 1. 前置节点不是 head节点 2.前置节点的waitStatus是SIGNAL,或者不是SIGNAL 但是通过CAS的方式设置为了SIGNAL. 3.前置节点的线程不能为空,此三种情况,就把pred前置节点的下一个节点置为取消节点的 下一个节点
- 如果不是上面3种情况,那就直接调unparkSuccessor方法进行唤醒后续节点,那么为什么需要去唤醒后续节点呢?其实CLH队列在某一时刻 可能会出现下图这么个情况
[图片上传失败...(image-bb414b-1649039795372)]
分析到这儿,其实acquire方法也就分析完了,下面会进行分析release方法,在分析release方法之前,先上一张图
[图片上传失败...(image-b6f5d7-1649039795372)]
上图表示了 某一时刻 CLH队列可能出现的情况
release方法分析
release方法主要用于释放锁,下面首先上源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryRelease 方法交由子类去实现,如果子类返回true, 表示释放成功,下面需要进一步去处理
- 把head节点拿到,如果h不为空,并且它的waitStatus不为0,那么说明后续的节点需要进行唤醒,然后返回true
- 如果 tryRelease 方法返回了false,那最后也就直接返回了false.
从上面 方法 可以看到如果需要唤醒后续节点 调用的方法是 unparkSuccessor() 方法,下面来分析下这个方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
从上面方法可以看到 此方法 实现的其实并不复杂,仅这一个方法 release 方法就执行完了
- 拿到当前node 的waitStatus,这里的node 在上一步中 传入的是head.
- 如果node的waitStatus的值 小于 0,那么就使用CAS的方式 将它置为0
- 然后拿到当前节点的后置节点,如果当前节点的后置节点为null 或者 它的waitStatus状态大于0,那么这个节点肯定是不合法的或者是取消了的,就从后往前找,找到waitStatus 小于等于0 的节点, 至于为何从后往前遍历,其实前头已经解释过了, waitStatus≤0 表示是中间需要唤醒的节点 或者是 尾结点(waitStatus=0)
- 如果找到了,然后就判断下 不为空的话,就把它内部的线程给唤醒 (unpark).
综上,可以看到 release 方法其实很简单, 接下来接着分析 acquireShared 方法
acquireShared方法分析
acquireShared 表示的是获取共享锁,下面直接看它的代码
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
首先 调用 tryAcquireShared 方法,此方法由子类去实现,如果子类返回了 大于等于 0的数,那么表示拿到了锁,
否则需要调用 doAcquireShared 方法,去自旋再次尝试获取共享锁
接着 看下 doAcquireShared 方法的源码
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先 addWaiter 该方法在上面 已经分析完了,将当前线程封装成一个Node, Node的类型为SHARED, 然后尝试加入 CLH队列的尾部,如果加入失败,则采用自旋的方式 不断的尝试加入
- 同样的 声明 失败标记和中断标记,拿到当前node的前置节点,如果其前置节点是head节点,这时候 说明它有资格去尝试获取共享锁,调用tryAcquireShared 去尝试拿到共享锁,如果返回了 大于等于0 的数,表示拿到了共享锁,那么会进行向下传播
- 由源码中可以看到,当前node拿到共享锁后,会 接着 去调用setHeadAndPropagate方法,重置head节点和向下传播,然后将原head节点的next置为了null, 以便于让GC回收掉,判断是否是中断状态,设置中断,failed置为false, 然后直接返回,就结束了
- 如果当前node节点的前置节点 不是 head节点,或者没有拿到共享锁,这时候 就会接着往下走,下面的代码就和独占锁是一样的了
下面来分析下 当前node拿到共享锁,执行了 setHeadAndPropagate方法,都做了哪些事
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- 首先记录当前head节点,然后将当前传进来的node设置成新的head节点
- propagate > 0 表示拿到了共享锁,还有锁可以拿,如果 等于 0 ,说明 我这次获取共享锁成功后,没有剩余共享锁可以获取,大于0,说明我这次获取共享锁成功后,还有剩余共享锁可以获取,所以需要唤醒后续线程
- h == null || h.waitStatus < 0 判空的一种经典写法,旧head是否为null, 一般情况下,是不可能等于null的,除非旧head刚好被GC掉了,h==null不满足,会继续判断h.waitStatus < 0, h.waitStatus可能等于0,可能等于-3
waitStatus = 0的情况,某个线程释放了锁 (release 或者 releaseShared) 或者前一个节点获取共享锁传播 setHeadAndPropagate ,唤醒后继节点的时候将 h.waitStatus = -1 设置为0,waitStatus = -3, doReleaseShared唤醒Head后继节点后 waitStatus 由 -1 变为 0,被唤醒的共享节点还没来得及获取锁,即还没有执行setHeadAndPropagate, 又有其它线程doReleaseShared 唤醒 head后继节点 h.waitStatus 从 0 到 -3
-
(h = head) == null || h.waitStatus < 0 首先判断新head是否为空,一般情况下 新head 不为空, (h = head) == null 不满足,判断 h.waitStatus < 0, h.waitStatus 可能等于0,可能小于0 (-1 or -3), h.waitStatus = -1, 已经调用了 shouldParkAfterFailedAcquire(), h.waitStatus 从 0 or -3 改为 -1, 可能阻塞,也可能未阻塞
-
拿到node的下一个节点,如果 是共享节点,那么就去唤醒阻塞节点,s == null || s.isShared() , s == null, 这个条件是可能会成立的,如果s是最后一个节点 那么它将会是null。
接下里分析下 doReleaseShared方法的源代码
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- 开始自旋了,将head头节点拿到,然后判断下 不为空,并且不等于尾结点的话,紧接着判断 它的waitStatus,如果它的waitStatus状态等于 SIGNAL ,那就尝试去利用CAS去设置成 0 如果失败了,继续自旋
- 如果成功了,开始调用 unparkSuccessor(h), 唤醒后续节点, unparkSuccessor 方法,在上述已经分析过了
- 如果waitStatus的状态不等于 SIGNAL,而等于0,那么这时候 就利用CAS 将它的 状态改为 PROPAGATE,等于0的时候,上述已经提到过了
- 如果 h==head,也就是head节点没变,那么就直接break掉,如果head节点变了,就接着去自旋判断 尝试去唤醒,如果head节点变了,说明有其它线程在尝试获取锁的时候,拿到了锁,比如,可能有其它node 调用了 setHeadAndPropagate,这时候有可能是需要去唤醒后续节点的,所以这里就直接进行了 自旋,进行再次判断。
到此 acquireShared 方法算是分析完了,接下来分析 releaseShared 方法
releaseShared方法分析
话不多说,直接上源码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先让子类去尝试 释放共享锁,如果成功了,就接着去调用doReleaseShared方法,尝试去唤醒后继节点,然后返回成功,否则返回失败
doReleaseShared 方法 上述已经分析过了,主要做了以下几件事,再来总结下
- 自旋,拿到head节点,判断head节点的waitStatus状态,如果为SIGNAL,就利用CAS将其改为0,如果失败,就继续自旋,知道成功后 调用 unparkSuccessor(h) 方法 唤醒后续 park住的节点
- 如果 waitStatus == 0, 就将其改为 -3,如果失败,就自旋,最后判断 head节点是否变了,如果没变,就跳出自旋
releaseShared 方法其实并不复杂,这样看,甚至挺简单的,下面分析下 acquireInterruptibly 方法吧
下面上个图
[图片上传失败...(image-364508-1649039795372)]
acquireInterruptibly方法分析
从它的名字可以看出 可中断的获取,上源码开始分析
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
- 首先直接判断 当前线程的中断状态,如果当前线程已经是中断了的,那么将直接抛出异常。
- 然后执行的操作就和 acquire 方法中的一样了, 使用 tryAcquire 去获取锁,如果没拿到,就调用 doAcquireInterruptibly 方法
下面直接看下 doAcquireInterruptibly 方法都做了哪些事儿
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 看上面的代码 其实是和 acquireQueued 中的方法差不多的 只不过加了对中断的处理
- 调用addWaiter方法,节点类型是独占类型,然后开始自旋,不断的去判断是否是头节点,然后尝试去获取锁
- 如果拿到了锁,就重置头节点,然后标记失败为false, 与 acquireQueued 方法不同的是,这个方法没有声明中断标记,而是在 下面的if条件判断 为true 之后,就直接抛出中断异常了,parkAndCheckInterceptor() 方法如果检查出来这个线程已经是中断状态的了,那就直接抛出中断异常
acquireSharedInterruptibly方法分析
其实这个方法和上面的方法都大同小异了,就是检查出来 如果当前节点维护的线程已经是中断状态的了,那就直接抛出中断异常,不会再往下执行了
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
如果检查出来已中断,就抛出中断异常,然后下面的逻辑就和 acquireShared中的一样了
下面 来看下 doAcquireSharedInterruptibly() 方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
同样的加入了 对 中断的处理,如果检查出来了 该线程已被中断,就直接抛出中断异常
tryAcquireNanos方法分析
tryAcquireNanos 方法 直译过来就是尝试去获取,该方法是带有超时时间的,如果超时时间一到,就会立马返回,不会再去自旋尝试获取,下面开始看下它的源代码
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
- 首先还是像之前一样,如果该线程已被中断,那就直接抛出中断异常
- 尝试去获取锁,如果成功了 就直接返回true,这个tryAcquire方法也是由子类去实现的,如果它返回了false,说明没有拿到锁,那就接着去调用 doAcquireNanos 方法了
下面来看下 doAcquireNanos 方法
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 从它的源码中,可以看出,其实此方法和上述的几个 acquireQueued 方法其实是差不多的,只不过这个方法加入了对超时的一些处理,增加了一个入参,超时时间
- 如果当前 nanosTimeout ≤ 0, 那么就直接返回 false ,没有超时时间,那就直接返回呗
- 计算出它的截止时间,当前时间 加上 它的超时时间,也就是截止时间了,然后调用 addWaiter 将当前Node加入到队列的尾部,然后声明了一个失败标记
- 接下来就开始自旋了,自旋处理的逻辑和上述方法中的基本上是一样的,只不过 每自旋一次,都会计算一次超时时间,如果超时时间到了,也就是小于 等于0了,那就直接返回false
- 接下来调用 shouldParkAfterFailedAcquire 方法,如果该方法返回true, 表示会将当前节点中的线程进行Park住,然后判断超时时间 是否 大于 spinForTimeoutThreshold(1000) 纳秒,如果是大于1000纳秒的,那么才进行park,park到超时时间,会自动醒过来,如果小于等于1000的话,就不会再进行睡了(不值当的睡了),而是去自旋,这是为了提高程序的响应时间,这里认为,如果你的超时时间还剩不到1000纳秒的话,如果睡的话,程序开销是大于自旋的开销的
- 下面 如果该线程已中断,则直接抛出中断异常
tryAcquireSharedNanos方法分析
其实该方法和上述方法实现原理差不多
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
如果该线程 已被中断,则立即抛出异常 然后调用tryAcquireShared方法,如果该方法 返回了大于等于0的数,表示拿到了锁,就直接返回了,否则会调 doAcquireSharedNanos 方法去尝试获取锁
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
和上面的处理逻辑基本上是一模一样的,只不过它处理的是共享锁获取,这里就不再赘述了
上面主要对 AQS 中的一些比较重要的一些方法,进行了原理分析,下面再分析一下AQS中有一个公共的内部类,也是特别重要的一个类,它是 ConditionObject ,里面有 await , signal 等方法 也是特别的重要
ConditionObject 源码分析
ConditionObject中还维护了一个 condition queue 的 队列
比较重要的公共方法如下:
// 和等待相关的一些方法
public final void await() throws InterruptedException
public final long awaitNanos(long nanosTimeout) throws InterruptedException
public final boolean await(long time, TimeUnit unit) throws InterruptedException
// 和唤醒相关的两个方法
public final void signal()
public final void signalAll()
下面就着重分析下 上面几个方法的实现与原理
await方法分析
首先上源码,这个方法有点像 Object 里面的 wait() 方法,用于实现线程的同步等待, 一般能执行此方法的线程 都是拿到了锁的, 否则执行的时候 会抛出 IllegalMonitorStateException 异常
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 首先它声明了 InterruptedException, 说明 它可能会抛出 中断异常
- 然后调用 addConditionWaiter() 方法,返回了一个node. 此方法表示将 当前线程 封装成一个node 加到 condition队列中
- 然后调用 fullyRelease 方法进行释放锁,然后记录中断处理的类型
- 之后会去判断 当前node 是否在同步队列中,如果在,则不做处理,接着往下走,如果不在,则进入循环体内,执行 LockSupport.park 方法,将当前线程进行挂起,之后判断中断状态,这里为啥要判断是否在同步队列中呢?原因是 如果有其它线程调用了 signal 进行了唤醒 wait 状态的线程的话,会将其节点再次移动到 CLH 队列中
- 如果当前 node 不在queue中了,则继续尝试去获取锁,然后判断中断状态, 进行设置
- 判断 当前 node 是否是最后一个节点,如果不是,要清除这个 取消节点了,顺便把其它的取消节点也给清掉
- 最后判断 interruptMode 不等于 0 ,进行处理 中断
从上面的源码 可以 看到,其处理貌似挺复杂的,需要一个一个的再分析下具体的方法执行流程,首先便是 addConditionWaiter 方法的分析,直接上其源码
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
- 首先拿到 lastWaiter,就是 condition 队列的尾部,如果它不为空,并且它的 waitStatus 状态 不是 CONDITION, 这时候 就需要重新进行连接整理condition queue了,把不是CONDITION状态的节点给剔除掉
- 将lastWaiter 重新赋值给 t , 接下来 创建 Node 节点,节点类型是 CONDITION 类型,也就是 -2
- 如果 t == null , 那么就说明 condition queue中没有任何元素啊,就进行一个初始化,将 node 赋值给 firstWaiter, 否则的话,就将 node 节点连在 最后,然后将 lastWaiter 指向 最后的 node 节点,然后将node返回
接下来,需要看下 unlinkCancelledWaiters() 方法都做了 哪些事,直接上源码
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
- 其实 如果直接这样看下去的话,可能会有点蒙,下面 我会结合图 来进行分析
- 将 firstWaiter 拿过来,赋值给 t, 然后声明一个 trail 为 null, 接下来就开始循环了,如果 t 不等于空 才进行循环,t 不等于空,才说明 condition queue中 存在元素
- 把 t 的下一个节点拿到,判断 t节点的waitStatus 是否是 CONDITION,如果是,就记一下t ,保存到trail中,然后接着往下循环
- 如果不是CONDITION了,就要移除它了,首先将 t 的 nextWaiter 置为 null, 这样便于 GC 回收掉它
- 如果 trail == null, 那么就说明,t 还没有 赋值给 trail, 也就是说,当前的 t 还是 firstWaiter , 那就直接 让 firstWaiter 指向 t 的下一个节点了, 如果 trail 不为空,那么就说明 已经到 中间某个节点了,trail 在这儿指向的是 t 的 前一个节点,trail.nextWaiter = next 那么这句代码的意思就是,将 t 的前一个节点的 nextWaiter 指向 t 的后一个节点,也就直接跳过了 t 节点
- 然后判断下是否到尾部了,如果 next == null, 就 说明 已经遍历到尾部了,那么就直接将 lastWaiter 指向 t 的前一个节点,此时的状态是,t 已经是 CONDITION 状态的节点了,然后它的下一个节点是 null, 那么这时候 lastWaiter节点 只能指向 t 的前一个节点了
好了,unlinkCancelledWaiters 方法 就分析到这儿了,至此 整个 addConditionWaiter 方法就分析完了
接下来 开始分析 fullyRelease 方法 是怎样实现的呢?上源码
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
- fullyRelease 顾名思义,完全释放的意思,首先声明了 失败标记,然后 使用 getState() 方法,拿到当前 state 的值,然后调用 release 方法 去尝试 释放锁,由此可见 await 方法是释放锁的,release 方法在上述代码分析中,已经分析过了
- 如果释放成功了,然后将失败标记 记为 false ,之后返回释放的 state 值,如果release 方法,返回了 false, 那就直接抛出 IllegalMonitorStateException
- 最后 finally 代码块中 判断 是否是失败状态,如果是,就将当前 node 标记为 CANCELLED
接下来 还有一个 方法 需要分析 isOnSyncQueue 这个方法它的运行原理
final boolean isOnSyncQueue(Node node) {
// 有一种可能 node 节点的 前置不为空,可能还尚未在CLH队列中,因为CAS设置到尾部的时候 是可能失败的
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
- isOnSyncQueue 这个方法主要用于 判断当前 node 有没有在 同步队列中 也就是CLH队列中,如果它的 waitStatus 状态为 CONDITION 那肯定不再 CLH 队列里面呀,因为上面已经分析过了,在CLH队列中Node的节点状态不可能为 CONDITION,或者 node 的 前置节点为 null, 那也不可能存在于 CLH队列中,上面也已经分析过了,如果是存在于 CLH 队列中的节点,那么它的前置节点肯定是不为空的
- 如果 node 的 next 指向 不为空,那么就返回true,说明它一定存在于 CLH 队列中, 如果存在于 condition queue 中,那么其 next 和 prev 指向是为 null 的,如果其不为空,那么肯定是存在于 CLH 队列中的
- 然后 如果当前 node 的 next 指向 也为 null, 那么就开始 遍历去CLH 队列中查去了,首先 如果node 的next指向为null, 那么它就很有可能是 CLH 队列的尾结点
- 调用 findNodeFromTail 方法 从尾结点开始查找 node, 首先 将 tail 的引用赋值给t ,然后开始for循环,如果 t 是 当前要找的 node , 那就直接返回 true, 如果 t == null, 那说明到头了,没找到,返回 false , 然后将 t 再 赋值指向 t 的 前置,如此循环往复,直到 t 为 null 或者是 t == node 时,结束循环
从 await 的源码中可以看出 就是 如果 isOnSyncQueue 返回 true 的话,就不会再去 LockSupport.park(this) 了
而是去再次尝试 获取锁,如果它没在 CLH 队列中,那么将会将其 挂起,等待被唤醒的时候,再去调用checkInterruptWhileWaiting这个方法,接下来 先分析下 此方法 做了哪些事
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
- 从其 方法名可以大概的看出其 方法做了啥事,检查中断状态,在等待结束之后,如果当前线程已被中断,就接着调 transferAfterCancelledWait() 方法,如果没有被中断 就直接返回 0 得了, 如果返回0 的话,又会接着去 while(!isOnSyncQueue(node)) 判断当前 node 是否在 CLH 队列中
- 如果被中断了 就调下面的方法,首先 使用 CAS 的方式 将当前 node 的waitStatus状态 从 CONDITION状态设置成 0, 如果成功了 开始调用 enq(node) 方法,将其添加到 CLH队列的尾部,然后 返回 true, 如果 返回true的话,就 然后 THROW_IE, 否则返回 REINTERRUPT , 这里为什么要这样处理呢? await() 方法是可以相应中断的,如果在此方法中 调用 compareAndSetWaitStatus 方法时,返回了false , 那么说明此时,signal 方法已经被调用了,因为 signal 方法中 会有 这么一行代码 compareAndSetWaitStatus(node, Node.CONDITION, 0),所以,此时线程中断,如果CAS成功了,说明 signal 方法 还没被调用,否则 已经被调用了,只是顺序,我们是无法确定的,可能是 signal → 中断 → 监测 → CAS, 或者 中断 → 监测 → signal → CAS 都有可能,这里统一 按照 signal 先发生的去处理
- 如果 使用 CAS 将其node 的 waitStatus 设置成 0 失败,则开始调用 while(!isOnSyncQueue(node)) 方法,如果不在 CLH 队列中,就调用 Thread.yield() 方法,让出CPU执行权,让其它线程有机会执行,直到 该node 存在于 CLH 队列中了,才循环结束,同时返回 false ,如果返回false , 则返回 REINTERRUPT
await 方法 最后会在拿到锁之后(acquireQueued方法之后)对 interruptMode 做出处理,如果它不等于0 的话,就调用一个方法进行处理, 还有 如果 acquireQueue() 方法 返回了 true, 则表示,线程已被中断,这时候,会去判断interruptMode,如果它不等于 THROW_IE 的话,就标记成 REINTERRUPT, 如果是0 的话,也会标记为 REINTERRUPT,重新进行中断处理
await 方法最后 对 interruptMode 这个标记做出处理
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
- 从上述代码中 可看出 如果 interruptMode == THROW_IE 就重新抛出异常,而 interruptMode == THROW_IE 也说明了 此中断 是发生在 await 期间的,所以需要抛出
- 而如果 interruptMode == REINTERRUPT 则,说明 中断是 发生在 singnal 之后的,其实就是 在中断的时候 signal 掺和进来了,或者是 在获取锁的时候,等待获取锁的过程中,被中断了,这时候就重新标记下中断状态
至此 await 方法的分析 就大功告成了,可以看出 其代码量可能不多,但是它的细节特别的多,就是考虑的点也特别的多,包括对中断的处理等等,下面就来分析下 signal 方法底层是怎么实现的了
signal方法分析
signal 方法主要用于唤醒 处于 await 状态的 一个线程,下面开始分析其实现原理
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
- 如果当前排它锁的持有者 不是当前线程的话,那就直接抛出异常了 IllegalMonitorStateException
- 拿到 第一个 等待节点,如果不为空的话,就开始去处理了, doSignal 方法去唤醒它 , 下面需要看下 doSignal 方法是怎么实现的
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
- 首先 将 first 节点的下一个节点置为 firstWaiter, 如果等于 null 了,那说明后面没有节点了呀,直接将lastWaiter 也指向 null
- 将 first.nextWaiter 置为 null 因为该节点 要从 condition queue 中拿掉了
- 开始调用 transferForSignal 方法, 将first 从 condition queue 中 移动至 CLH 队列,如果成功了 返回true, 此方法也就结束了,否则再重新赋值 first ,从firstWaiter 再开始, first 不为空的话,接着尝试去移动
下面看下 transferForSignal 方法是如何实现的,此方法是 signal 方法的核心方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 首先一上来 便是一个 CAS 操作,将node的 CONDITION 状态改为0,这也呼应了 上文中的中断处理部分的代码,如果CAS 成功了,才往下执行,否则,就直接任务失败了,返回false
- 往下执行便是 enq 方法,入队,通过自旋来保证一定能够入队成功,这个enq 方法 返回了一个node, 通过上面分析 enq 方法原理,可得出,这个p指的其实是 node 节点的前置节点
- 判断 node 前置节点的waitStatus状态,如果是大于0 的,说明已经取消了,那就尝试将其 waitStatus 状态改为SIGNAL状态,以保证其能够正常唤醒后续节点,如果这步CAS操作 也失败了,那就直接唤醒 node 中的线程吧,让其自己去想办法后续怎么处理 其实通过上面分析,已经知道其会怎么做了,肯定会自旋的去尝试获取锁,然后判断自己应不应该park住,如果想要park住,必须设置前一个节点的waitStatus为SIGNAL
- 最后方法返回 true, 表示执行成功
下面上张图
[图片上传失败...(image-d20902-1649039795372)]
signalAll方法分析
到此,signal 方法也就分析完了,可见,它的处理逻辑其实不是很复杂的,接下来分析 signalAll 方法,此方法用来唤醒所有的等待, 接下来 上源码
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
- 如果获取独占锁的线程 不是 当前线程的话,直接抛出 IllegalMonitorStateException
- 拿到 firstWaiter 节点,让 first 指向它,然后如果不等于 空的话,说明 condition queue中有值,紧接着调用 doSignalAll(first) 方法就可以了
下面来看下 doSignalAll 方法都干了哪些事
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
- 其实 doSignalAll 方法 也就是清空了 condition queue, 将 lastWaiter 和 firstWaiter 均指向 null
- 然后就开始 循环处理了,直到 first == null , 其处理逻辑是,先将 first 的 nextWaiter 保存下来,记为next,然后将 first的nextWaiter 置为 null, 然后 调用 transferForSignal 方法 将 first 节点移至 CLH 队列中, 然后 first 再移动 指向 它的下一个节点,也就是再指向 next 节点,其实就是一个链表操作,transferForSignal 方法 上面已经分析过了,就是 更改node 的 waitStatus 状态,然后 调用 enq 方法 将其加入到 CLH 队列中
下面来看下 带有超时时间的 await 方法是如何实现的,当获得锁的线程 调用 此方法时,会释放锁,然后再被唤醒之前,或者中断之前,或者到达指定时间之前,一直处于等待状态,time 为最长等待时间, unit 为等待单位,如果此方法 返回前,监测到等待时间超时,则返回false , 否则返回 true
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSuport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
- 将时间转换成 纳秒,然后检查当前线程是否已中断,如果当前线程已被中断,则立马抛出中断 异常
- 下面的处理逻辑 就和 await 方法中的一样了,首先调用 addConditionWaiter 方法 将当前线程封装成 node 加入到 condition queue 中, 然后调用 fullyRelease 方法,进行释放锁,接下来就要记录超时时间了,然后记录中断状态
- 接下来的判断和 await 方法中的 基本一致,判断当前 node 是否在同步队列中,如果是,直接跳出 while , 表示当前 node 中的线程 已经被 signal 了,无需再去等待了,直接去 CLH 队列中 尝试 获取锁吧,这部分的逻辑 其实和 await 方法中的一样
- 不同的是,当 await 的线程 还没有被唤醒时,进行的处理是不同的,直接在 while 循环中 判断 当前 传进来的 nanosTimeout 是否小于等于0,如果是,说明时间到,已经超时了呀,调一下 transferAfterCancelledWait 方法,这个方法 在上面已经分析过了,这里 再进行一次分析,主要检查当前线程是否被 signal 唤醒,如果没有,就 调 enq 入队了,返回true, 否则就 while 循环判断,直到 将 node 放置在 CLH 队列中 才返回 false,这时候 timeout 就是此方法返回回来的值,接着开始去尝试获取锁,然后处理中断,最后返回 timeout取反,也就是说 如果你超时了,但是没有被signal 唤醒 最后是返回 false 的,如果被 signal 唤醒了,那么最后一取反是返回 true 的
- 接着分析 while 循环里面的逻辑,如果 nanosTimeout ≥ spinForTimeoutThreshold ,那么会进行线程的挂起,也就是 调用 LockSupport.parkNanos(this, nanosTimeout), 等到挂起一定时间,会自动醒过来, 然后会计算一下超时时间,然后去判断是否已经超时了
- 之后就是处理中断了,和上面的 await 的处理逻辑是一样的
下面再来分析下 awaitUninterruptibly 方法的实现原理,其实这个方法 就比较简单了,顾名思义,不可中断的await,所以 此方法 就没有像上面两个方法一样,去做中断的一些处理,它就比较简单了
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
此方法的运行逻辑 其实和上面基本一致,首先调用 addConditionWaiter 方法,将其加入到 condition queue 中
释放锁资源,然后 循环判断 其是否已经被唤醒,如果没有,就会去 调用 LockSupport的park 方法,进行挂起
这时候其实会去判断 当前线程 是否已中断,如果是,则将中断状态置为 true ,标记上 已经被中断了, 下面 如果该
线程已经被放到CLH队列中了,说明其已经被唤醒了,这时候就要尝试去调用 acquireQueued 方法去获取锁了,
如果返回了true , 表示已经被中断了,所以 调用 selfInterrupt 方法 重新标记中断
到此,AQS 中其大部分的核心代码已经分析完了,还有一些非核心方法 接着往下分析
其它方法分析
isQueued方法分析
这个 方法比较简单,那么直接上源码就好了
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
这个方法 做的事情其实比较 简单,主要就是判断 当前 传进来 的线程 是否在 CLH队列中,如果传进来的线程 为
null, 那就直接抛出 空指针异常,否则,从后往前进行遍历 查找 队列中 有没有这个线程,如果有,就返回true, 否
则就返回 false, 至于 为何 从后往前 遍历,其实 前面已经分析过了
hasQueuedPredecessors方法分析
该方法主要判断 当前线程 是否是 head 节点的 下一个节点 维护的线程,如果对于公平锁而言,也就是 是否是正在获取锁的线程,下面看看其源码
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 拿到 tail 引用 和 head 引用
- 判断 head ≠ tail, 这样说明 CLH 队列中存在 node 节点,然后 将s 置为 head 节点的下一个节点, 如果不为空的话 接着判断 s 中维护的线程是否是当前线程 如果不是 返回true
- 这个方法 见名知意,其实就是判断 当前线程 在queue中 前面是否有排队的,正在处理的线程,如果有 返回true
下面几个方法的逻辑其实都差不多,遍历 CLH 队列,然后进行统计
getQueueLength方法分析
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
此方法 用于 获取 CLH 队列的长度,从后往前遍历,如果存在 thread 不为空,就 +1, 然后统计当前 CLH 队列中存在的线程数
getQueuedThreads方法分析
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
此方法 是拿到 CLH 队列中所有的 thread , 收集到一个 List 中,然后返回
getExclusiveQueuedThreads 方法分析
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
此方法 从后往前进行遍历,拿到所有的 非共享节点的node,然后取其线程 收集到 list 中,最后返回
getSharedQueuedThreads方法分析
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
此方法 从后往前进行遍历,拿到所有的共享节点的node,然后取其线程,收集到list中,最后返回
hasWaiters方法分析
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
////// ConditionObject 类中
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
- hasWaiters 此方法用于 判断 传进来的 condition 中是否存在 等待节点, 首先调用的 owns 主要用于判断 当前condition 是否属于 当前的 AQS ,每个锁 是可以创建 多个 condition 的,你不能把别的锁 创建的 condition 传到另一个锁的方法里面去啊,owns 主要判断 当前 传进来的 condition 是否属于 当前的这个锁
- 然后调用 hasWaiters 方法进行判断,首先判断 如果获取 独占锁的线程 非当前线程,那么就直接抛个异常出去了,否则 从 firstWaiter 开始遍历 然后判断 node 的 waitStatus 状态 有没有 等于 CONDITION 的,如果有,就返回 true, 说明 存在 await 的线程,否则 遍历完都没有的话,那么就直接返回false了
最后,完结撒花,关于 AQS 中的大多数方法,其实已经分析的差不多了,可能还存在着 个别的 方法没有分析,那么可能是,有和已经分析过的 方法是类似的,所以,就没有再分析了,关于 AQS 基本上 了解到这个程度,我感觉已经挺深的了,可能 我还有一些 细节上的东西,没有挖掘出来,那么后续,我将在学习过程中,继续补充完善,后面的话,我将再接着 分析 和 AQS 相关的一些类, 再次感叹 AQS 的作者 Doug Lea 的伟大,致敬!!!
网友评论