简单介绍
AQS
(AbstractQueuedSynchronizer
)是一个用来构建锁和同步器的框架,其中同步指的是线程之间的通信和协作。Lock
包中的各种锁和 concurrent
包中的各种同步器都是基于它来构造的,例如 ReentrantLock
, CountDownLatch
。
由于 AQS
是基于 Java
并发包中管程的一种实现,所以在学习 AQS
之前,先来了解一下什么是管程吧
管程提供了一种机制,线程可以临时放弃互斥访问,等待某些条件得到满足后,重新获得执行权恢复它的互斥访问。即:在管程中的线程可以临时放弃管程的互斥访问,让其他线程进入到管程里。
管程
管程由四个部分组成:
- 管程内部的共享变量
- 管程内部的条件变量
- 管程内部并行执行的进程
- 对于局部与管程内部的共享数据设置初始值的设置
管程就像是一个对象监视器,任何线程想要访问该共享变量就要排队进入监控范围。进入之后,不符合条件的,要继续等待,直到被通知,然后继续进入监视器。
示意图那管程是如何解决互斥和同步呢?举个例子,去医院看医生,就诊室是一个临界区,医生是临界区里的共享变量,病人是线程,需要在门口一个一个排队进入就诊室,并且一次只能进去一个。在医生给病人看病的时候,外面的病人是无法进入就诊室的,直到该医生看完该病人通知下一个病人进来看病,这样就解决了互斥的问题。假如有位病人进入就诊室后,医生叫他先去做一下CT才能继续看病,然后这个病人他就去先去排队做CT了(条件队列),同时释放了医生这个共享变量,通知下一个病人进来看病,当这个病人做完CT,他又要拿着CT报告回到就诊室门口排队看医生,这就是同步。
AQS实现原理
AQS维护了一个共享资源 state
和一个 FIFO
的等待队列,底层利用了 CAS
机制来保证操作的原子性。
以实现独占锁为例:
-
state
初始为 0 - 某个线程获取成功后,
state
加 1 - 其他线程再想获取的话,共享资源已经被占用,到
FIFO
队列里排队 - 占有的线程执行完成,释放资源,
statet
减 1 - 唤醒等待队列中的线程去获取
state
由于 state
是多线程共享变量,所以需要定义为 volatile
以保证可见性,但是不能保证原子性,所以 AQS
提供了对 state
的原子操作方法,保证了线程安全。
另外 AQS
中实现的 FIFO
队列(CLH
队列)其实是双向链表实现的,由 head
tail
节点表示,head
结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放。
以上内容从AbstractQueuedSynchronizer的定义也可以看出:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private transient volatile Node head; //头节点
private transient volatile Node tail; //尾节点
private volatile int state; //共享变量
...
//CAS操作,保证state的原子性
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}
源码解析
我们以 ReentrantLock
非公平锁(独占,非公平模式)为例。ReentrantLock
的使用方法可以具体看上一篇文章:ReentrantLock
获取锁
这里我们直接看到 NonfairSync
的 lock()
方法:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 使用
CAS
来获取state
值,如果成功设置1,代表state
资源获取锁成功 ,此时记录下当前占用state
的线程 - 如果使用
CAS
设置state
为 1 失败,代表获取锁失败,则执行acquire(1)
方法
acquire(1)
方法是 AQS
提供的
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先 调用 tryAcquire
尝试着获取 state
,如果成功,则跳过后面的步骤。如果失败,则执行 acquireQueued
将线程加入 FIFO
等待队列中。
tryAcquire()
是 AQS
提供的模板方法,最终由 Sync
(AQS
的一个具体实现类)实现,代码如下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 当
state
为 0 时,代表锁已经被释放,使用CAS
去获取,获取成功,则记下此时当有锁的线程 - 当
state
不为 0,代表已经有线程占有了锁,如果此时的线程是占有的线程,则更新state
如果获取锁失败,即 tryAcquire
执行失败,则执行 acquireQueued
方法,将线程放入 FIFO
队列
执行acquireQueued之前,会先执行addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter
的逻辑比较清楚,就是把包含当前线程信息的节点插到等待队列中:
- 先获取
FIFO
尾节点tail
- 存在
tail
,采用CAS
的方式将等待线程入队 - 如果
tail
为空,则执行enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
先判断 tail
是否空,是则代表 FIFO
队列还未构建,此时先构建头节点,后再以 CAS
方式将此线程节点入队。
注意:head
节点为虚节点,不记录占用 state
的是哪个线程,它只代表有线程占有了 state
当前线程入队成功之后,该执行 acquireQueued
方法了。
在看 acquireQueued
方法前,我们先假设当前 state = 0
,现在有 T1, T2, T3 这三个线程要去竞争锁
T1竞争成功,
state
加 1,并且由于 FIFO
未构建,所以先创建 head
节点T1竞争成功
之后T2,T3再去竞争锁,竞争失败,需要入队
T2,T3入队
问题来了,T2,T3入队后怎么处理呢,马上阻塞的话意味着要切换线程状态,从运行状态转为阻塞态,涉及到了用户态向内核态的切换,相反唤醒需要从内核态转为用户态,开销相对比较大,所以
AQS
对这种入队线程采用 自旋 的方式来竞争锁T2,T3自旋
独占模式下,如果T1一直占用锁,T2,T3则一直自旋没太大意义,反而会占用CPU,影响性能,所以更适合的方法是自旋一两次之后阻塞等待前节点唤醒。
另外,如果锁在自旋过程中被中断了,或者超时,应该处于取消状态。
基于每个
Node
可能处于的状态,AQS
为其定义了一个变量 waitStatus
,根据这个变量值对响应节点进行相关的操作:
static final class Node {
static final Node SHARED = new Node(); // 标识等待节点处于共享模式
static final Node EXCLUSIVE = null; // 标识等待节点处于独占模式
static final int CANCELLED = 1; // 由于超时或中断,节点已被取消
static final int SIGNAL = -1; // 后续节点已被阻塞,当该节点释放或者取消的时候需要断开与后续节点的链接
static final int CONDITION = -2; // 标识节点正在等待条件变量
static final int PROPAGATE = -3; // 标识后续节点会传播唤醒的操作,共享模式下起作用
volatile int waitStatus; //等待状态,对于 condition 节点初始值为 CONDITION,其他情况下默认为 0,通过CAS 原子更新
...
}
了解了状态的定义之后,再来看 AQS
对自旋的处理:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 先获取前节点
if (p == head && tryAcquire(arg)) { // 如果前节点是 head,则尝试自旋获取锁
setHead(node); // 将当前节点设置为 head,原 head 节点出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前节点不是 head 或者竞争锁失败,则进入等待队列
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果自旋过程中因为异常等原因最终失败,则调用此方法
if (failed)
cancelAcquire(node);
}
}
-
当前节点是
当前节点是 head 并且获取锁成功的处理head
并且获取锁(tryAcquire
)成功:
把head
指向当前节点,并且让原head
节点出队private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
将
head
设置为当前节点后,要把节点的thread
,pre
设置为null
,head
是虚节点,不保留除了waitStatus
之外的信息,因为当前占有锁的线程已经被exclusiveThread
记录了,如果head
再记录thread
不仅多此一举,反而在释放锁的时候还要多一个head
的thread
的释放操作。 -
如果前节点不是
head
,或者获取锁(tryAcquire
)失败:
首先它会调用shouldParkAfterFailedAcquire
方法判断是否应该停止自旋进入阻塞状态private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 获取前节点的状态 if (ws == Node.SIGNAL) // 如果前节点是 SIGNAL 状态,则该节点可以阻塞了 return true; if (ws > 0) { // 如果前节点是取消状态 do { node.prev = pred = pred.prev; // 移除取消状态的节点 } while (pred.waitStatus > 0); pred.next = node; } else { // 当前节点状态为0或者传播状态,设置前节点的状态为 SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
- 当前节点的状态为
SIGNAL
时,后续节点进入阻塞
T2,T3 的前驱节点的 waitStatus 都为 SIGNAL
如图所示,T2,T3的前驱节点的waitStatus
都是SIGNAL
,所以T2,T3此时都可以阻塞 -
前驱节点为取消,移除当前节点之前的所有取消的节点
取消节点
如图所示,当前节点T4的前驱节点T3为取消状态,T2也为取消状态,执行完这段代码之后,T3 T2都会被移除。
- 如果前节点小于等于 0,则要把前节点的状态设置为
SIGNAL
,这样下一次自旋后发现前节点为SIGNAL
时,该节点就会进入阻塞(即步骤1)。
当
shouldParkAfterFailedAcquire
返回了true
,则代表线程可以阻塞了,那么parkAndCheckInterrupt
方法就会让线程转换为阻塞状态private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
- 当前节点的状态为
-
最后,如果自旋过程的发生异常,
Node
被置为取消状态private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; // 如果前节点是取消节点,则将当前节点的前置节点设置为之前的第一个非取消状态的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 经过过滤的前节点的 next 节点 Node predNext = pred.next; // 设置当前节点的状态 node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,则将尾节点设置为其前驱节点,如果成功,则把尾节点的 next 设置为空 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 如果前节点不是头节点,并且状态为 SIGNAL 或者成功设置为 SIGNAL,就将当前节点的后继节点设置为前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果前节点是头节点,或者前节点设置 SIGNAL 状态失败,则唤醒后继节点去竞争锁 unparkSuccessor(node); } node.next = node; // help GC } }
- 假设当前节点之前有取消节点
当前节点之前有取消节点
当前节点的pre
会指向之前第一个非取消状态的节点 - 如果当前节点不是尾节点也不是头节点的后继节点
当前节点不是尾节点也不是头节点
当前节点的前节点的后继节点链接当前节点的后继节点(好绕 - - ||),当前节点的后继节点指向自己
,当T4节点执行自旋代码时,T4的前置节点会指向之前的非取消节点
image.png
中间的两个CANCAL
节点变为不可达,就会被GC了 - 如果当前节点是尾节点(
tail
节点)
image.png -
如果当前节点的前节点是头节点,最终结果如下
image.png
同样中间节点会在
tail
自旋时后变得不可达。 - 假设当前节点之前有取消节点
以上就是 AQS
获取锁的流程。
释放锁
不管是公平锁还是非公平锁,释放锁最终调用的都是 AQS
的 release
方法来释放锁。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
方法定义在了 AQS
的子类 Sync
方法里(具体可以看上一篇文章)
分析一下为什么唤醒线程的条件是 h != null && h.waitStatus != 0
- 如果
h == null
,一种情况是一个线程在竞争锁,但是现在它释放了,没有所谓的后继节点,就不需要唤醒,还有一种情况就是有线程在竞争锁,但是还没构建头节点,此时线程已经在运行了,也不需要唤醒。 - 如果
h != null
,并且h.waitStatus == 0
,说明后继节点正在自旋竞争锁,无需唤醒 - 如果
h != null
,并且h.waitStatus < 0
,此时 waitStatus 值可能是SIGNAL
或者PROPAGATE
,这两种情况说明后继结点阻塞需要被唤醒
唤醒线程:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 将 head 的 waitStatus 设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 取队列第一个非取消状态的节点
Node s = node.next;
// 如果 s 为 null 或者为取消状态,则从尾向前获取最后一个非取消状态的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
为什么是从尾向前找呢,因为节点在入队的时候是这样的:
private Node addWaiter(Node mode) {
...
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
...
}
先执行 node.prev = pred
后执行 pred.net = node
,如果唤醒的操作是在两者之间,从前往后找的时候会找不到 head 的后继节点。
网友评论