一、写在前面
上篇文章通过ReentrantLock 的加锁和释放锁过程给大家聊了聊AQS架构以及实现原理,具体参见《带你走进AQS内心世界》。
作为一个合格的 攻城狮 光是这些肯定无法满足我们的胃口了。
于是……
源码! 源码! 源码 呐喊声连绵不绝。
好吧!这章我们就再续前缘一起来聊一聊AQS源码的那些事吧,源码的解读比较枯燥,希望大家做好准备。
二、队列同步器
队列同步器AbstractQueuedSynchronizer(以下简称同步器)
并发包下用来构建锁或者其他同步组件的基础框架
主要实现方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
同步器通过一个int类型的成员变量state来表示同步状态。
当state > 0 表示已经获取到了锁。
当state = 0 表示资源释放了锁。
其提供了三个方法,getState()、 setState(int newState)和compareAndSetState(int expect, int update)来操作同步状态,当然同步器能够保证状态的改变是安全的。
同步器通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
到这有没有对同步队列懵逼的??如果有也别着急,在下面源码解析中我会化一些图来解析下。
三、队列同步器主要接口列表
同步队列器设计是基于模版方法模式的,也就是说使用者需要继承同步器并重写指定的方法。
主要接口列表
1 getState():返回同步状态的当前值;
2 setState(int newState):设置当前同步状态;
3 compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
4 tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
5 tryRelease(int arg):独占式释放同步状态;
6 tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
7 tryReleaseShared(int arg):共享式释放同步状态;
8 isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
9 acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
10 acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
11 tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
12 acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
13 acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
14 tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
15 release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
16 releaseShared(int arg):共享式释放同步状态;
同步器提供的模版方法基本分为三种
独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步状态中等到线程情况
了解了同步器的基本内容下一章节我们就该进入源码了,源码主讲解
acquire-release 而acquireShared-releaseShared 的执行过程和前面大同小异就不再这讲了。
四、进入源码世界
独占式同步状态获取与释放
同步状态的获取是通过同步器的acquire(int arg )方法来获取的、该方法对中断敏感,也就是由于线程在获取同步状态失败后会进入同步队列中、后续线程的中断操作,线程不会从队列中移出。代码清单
acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
上述代码主要完成同步状态的获取,节点的构造、加入队列以及在队列中自旋等待等相关工作。
流程如下
- 通过tryAcquire(arg) 方法保证线程安全的方式获取同步状态获取失败则构造同步节点(独占式Node.EXCLUSIVE)
- 通过addWaiter(Node node)方法将该节点加入到队列的尾部,最后调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 使得该节点以自旋的方式获取同步状态,如果获取不到则阻塞该节点中的线程,等待被唤醒。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。selfInterrupt()
addWaiter(Node node)
private Node addWaiter(Node mode) {
// 以给定的模式来构建节点, mode有两种模式
// 共享式SHARED, 独占式EXCLUSIVE;
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速将该节点加入到队列的尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速加入失败,则通过 anq方式入列
enq(node);
return node;
}
enq(final Node node)
private Node enq(final Node node) {
// CAS自旋,直到加入队尾成功
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为Hader节点,并将tail 指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {// 正常流程,加入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这是同步队列入列的过程, 代码的注释已经很明白了, 我在这就不多解释,
什么? ~~~要看图~~~~好吧!就手绘两张吧!
CLH队列面貌
同步队列面貌.jpg
入列过程
同步队列器入列.jpg
这下满足了吧! 我们接着往下走!
final boolean acquireQueued(final Node node, int arg) {
// 是否拿到资源
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果其前驱节点为head 节点,说明此节点有资格去获取资源了。(可能是被前驱节点唤醒,也可能被interrupted了的)
if (p == head && tryAcquire(arg)) {
// 拿到资源后将自己设置为head节点,
setHead(node);
// 将前驱节点 p.next = nul 在setHead(node); 中已经将node.prev = null 设置为空了,方便GC回收前驱节点,也相当于出列。
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不符合上述条件,说明自己可以休息了,进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
} finally {
if (failed)
cancelAcquire(node);
}
}
此方法当前线程在‘死循环’中尝试获取同步状态,只有其前驱节点为head节点时才有尝试获取同步状态的资格。
Why?
- 因为只有head是成功获取同步状态的节点,而head节点的线程在释放同步状态的同时,会唤醒后继节点,后继节点在被唤醒后检测自己的前驱节点是否是head节点
- 维护CLH的FIFO原则。该方法中节点自旋获取同步状态。
waht? 还要图~~~~~~。好吧满足你。
节点自旋获取同步状态.jpg
接着往下走吧!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果已经告诉过前驱节点,获取到资源后通知自己下,那就可以安心的去休息了。
return true;
if (ws > 0) {
// 如果前驱节点放弃了,那就循环一直往前找,直到找到一个正常等待状态的节点,排在他后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱状态为0 或者 PROPAGATE 状态, 那就把前驱状态设置成SIGNAL,告诉它获取资源后通知下自己。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
次方法在自己前驱节点不是head节点时再次回到同步队列中找到一个舒适位置休息等待被唤醒。
private final boolean parkAndCheckInterrupt() {
// 调用park方法是线程进入waiting 状态
LockSupport.park(this);
//如果被唤醒查看是不是被中断状态
return Thread.interrupted();
}
最后调用park方法时线程进入wating状态。
到这独占方式的获取同步状态已经聊完了。 我接着看释放同步状态的过程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒head节点的后继节点。
unparkSuccessor(h);
return true;
}
return false;
}
此方法执行时,会释放同步状态(也就是将state值设置为0),同时会唤醒head节点的后继节点。
到这独占式获取同步和释放同步状态的源码已经分析完了。 有没有懵尼? 懵了也别怕最后我们再来张流程图帮助大家理解。
独占锁获取同步状态.jpg
结合上面源码分析,应该对AQS源码有所了解了吧。
五、总结
分析了独占式同步状态的获取和释放过程,适当做下总结: 在获取同步状态时,同步器维持一个同步队列,获取状态失败的线程都会加入到队列中并在队列中进行自旋,出列(或者停止自旋的)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
网友评论