AQS
-
AQS全称是Abstract Queued Synchronizer,翻译为同步器,它是一套实现多线程同步功能的框架。AQS在源码中被广泛使用,特别是在java高并发编程中,比如ReentrantLock,Semaphore,CountDownLatch和ThreadPoolExecutor,在实际开发中也可以通过自定义AQS来实现各种需求场景。
-
ReentrantLock的同步功能的原理就是通过AQS来实现的
1.ReentrantLock的加锁方法lock()
public class ReentrantLock implements Lock, Serializable {
private final ReentrantLock.Sync sync;
public ReentrantLock() {
this.sync = new ReentrantLock.NonfairSync();
}
public ReentrantLock(boolean fair) {
this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
}
public void lock() {
this.sync.lock();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquire) {}
protected final boolean tryRelease(int release) {}
protected final boolean isHeldExclusively() {}
final ConditionObject newCondition() {}
final Thread getOwner() {}
final int getHoldCount() {}
final boolean isLocked() {}
private void readObject(ObjectInputStream var1) {}
}
}
- 解析
- ReentrantLock的lock方法调用的是sync的lock方法,sync是ReentrantLock的变量Sync的对象实例
- Sync是ReentrantLock的静态内部类,该类继承自AbstractQueuedSynchronizer
- Sync类有有两个实现类-NonfairSync 和 FairSync,这两个类也是ReentrantLock的静态内部类,分别对应非公平锁和公平锁。
- 这两个类通过ReentrantLock的带参构造函数确定实现,默认sync为非公平锁NonfairSync实现
NonfairSync实现源码如下:
static final class NonfairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = 7316153563782823691L;
NonfairSync() {
}
final void lock() {
//1.通过cas操作来修改state状态,表示争抢锁的操作
if (this.compareAndSetState(0, 1)) {
//2.修改状态成功,则设置当前获得锁状态的线程为当前执行线程
this.setExclusiveOwnerThread(Thread.currentThread());
} else {
//3.修改状态失败,调用AQS的acquire方法进行后续处理
this.acquire(1);
}
}
protected final boolean tryAcquire(int var1) {
return this.nonfairTryAcquire(var1);
}
}
- 解析:在非公平锁NonfairSync的lock方法中,主要操作如下
- 调用compareAndSetState方法,通过cas设置变量state(同步状态)成功,表示当前线程获取锁成功,则将当前线程设置为独占线程。
- 如果通过cas设置变量state成功,表示当前锁正在被其他线程持有,则调用AQS的acquire方法进行后续操作
AbstractQueuedSynchronizer的acquire方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
protected AbstractQueuedSynchronizer() {
}
protected final int getState() {
return this.state;
}
protected final void setState(int var1) {
this.state = var1;
}
protected final boolean compareAndSetState(int var1, int var2) {
return unsafe.compareAndSwapInt(this, stateOffset, var1, var2);
}
public final void acquire(int var1) {
if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
selfInterrupt();
}
}
protected boolean tryAcquire(int var1) {
throw new UnsupportedOperationException();
}
}
- 解析:AQS的acquire方法比较重要,该方法中主要有3个重要步骤
- tryAcquire 方法主要目的是尝试获取锁
- AQS中的tryAcquire 比较特殊,默认情况下会直接抛异常,因此需要在子类复写,所以重点看非公平锁NonfairSync的tryAcquire 的代码实现
- 如果当前线程调用tryAcquire 获取锁失败,则会调用addWaiter()方法,该方法的作用是将当前线程添加到一个等待队列中,并返回新添加队列的节点Node
- 接着会调用acquireQueued 方法,将加入到队列中的节点,通过自旋去尝试获取锁,根据情况将线程挂起或者取消。
- tryAcquire 方法主要目的是尝试获取锁
NonfairSync的tryAcquire方法
- NonfairSync的tryAcquire会调用Sync的nonfairTryAcquire方法
static final class NonfairSync extends ReentrantLock.Sync {
protected final boolean tryAcquire(int acquires) {
return this.nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
//获取当前执行的线程
Thread current = Thread.currentThread();
//获得state的值
int state = this.getState();
// state为0,表示当前无锁状态
if (state == 0) {
//无锁状态下,通过cas操作将state的值设置为1,设置当前线程持有锁
if (this.compareAndSetState(0, acquires)) {
this.setExclusiveOwnerThread(current);
return true;
}
}
// 如果state不为0,说明当前已有线程持有锁,如果当前持有锁的线程就是当前线程
// 则增加锁的重入次数,将state加1并重新设置
else if (current == this.getExclusiveOwnerThread()) {
int newState = state + acquires;
if (newState < 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(newState);
return true;
}
return false;
}
}
- 解析:tryAcquire方法的作用是尝试获取锁,并返回获取锁的结果是否成功
- AQS中有一个变量state,表示当前锁的状态
- 如果state为0表示当前是无锁状态,通过cas更新state状态的值为1,并设置当前线程持有锁,并返回true,表示获取锁成功
- 如果当前线程属于重入,则增加重入次数,返回true
- 如果上诉情况都不满足,则获取锁失败,返回false
ReentrantLock的lock方法调用流程图:
1.ReentrantLock的lock加锁流程图.png- 解析
- 在ReentrantLock执行lock()的过程中,大部分同步机制的核心逻辑都已经在AQS中实现,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫做模板模式
- 这一设计模式在Android的Activity生命周期也有使用,因为Activity的生命周期的执行流程都已经在framework中定义好了,子类Activity只要在相应的onCreate,onPause等生命周期方法中提供相应的实现即可。
- JUC包中的其他组件如CountDownLatch,Semaphor等都是通过一个内部类Sync来继承AQS,然后在内部中通过操作Sync来实现同步。
- 这种做法的好处是将线程控制的逻辑控制在Sync内部,而对外面向用户提供的接口是自定义锁,这中聚合关系能够很好的解耦两者所关注的逻辑。
- 在ReentrantLock执行lock()的过程中,大部分同步机制的核心逻辑都已经在AQS中实现,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫做模板模式
2.AQS核心功能原理分析
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
static final class Node {
...
}
}
- AQS类中有几个关键的属性,Node和state
2.1.state 锁状态
- state表示当前锁状态。
- 当state=0时表示无锁状态;
- 当state>0时,表示已经有线程获得了锁,也就是state=1
- 如果同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5.
- 而在释放锁的时候,同样需要释放5次直到state=0,其他线程才有资格获得锁。
state还有一个功能是实现锁的独占模式或者共享模式
- 独占模式:只有一个线程能够持有同步锁
- 比如在独占模式下,我们可以把state的初始值设置为0,当某个线程申请锁对象时,需要判断state的值是不是0,如果不是0的话意味着其他线程已经持有该锁,则本线程需要阻塞等待。
- 共享模式:可以有多个线程持有同步锁
- 在共享模式下,比如某项操作我们允许10个线程同时进行,超过这个数量的线程就需要阻塞等待。
- 那么只需要在线程申请对象时判断state的值是否小于10,如果小于10,就将state加1后继续同步语句的执行。
- 如果等于10,说明已经有10个线程在同时执行该操作,本线程需要阻塞等待
2.2.Node双端队列节点
Node时一个先进先出的双端队列,并且是等待队列,当多线程争用资源被阻塞时会进入此队列。这个队列时AQS实现多线程同步的核心。
AQS中有两个Node指针,分别指向队列的head和tail,分别对应头节点和尾节点。
- Node类结构如下:
static final class Node {
// 该等待同步的节点处于共享模式
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
// 该等待同步的节点处于独占模式
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
// Node中的线程状态,和AQS中的state不同,值范围有1,0,-1,-2,-3五个之
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 前驱节点
volatile AbstractQueuedSynchronizer.Node prev;
// 后继节点
volatile AbstractQueuedSynchronizer.Node next;
// 等待锁的线程
volatile Thread thread;
// 用于标记当前当前节点是否是共享模式,在Node构造函数中赋值
AbstractQueuedSynchronizer.Node nextWaiter;
final boolean isShared() {
return this.nextWaiter == SHARED;
}
final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node var1 = this.prev;
if (var1 == null) {
throw new NullPointerException();
} else {
return var1;
}
}
Node() {
}
Node(Thread var1, AbstractQueuedSynchronizer.Node var2) {
this.nextWaiter = var2;
this.thread = var1;
}
Node(Thread var1, int var2) {
this.waitStatus = var2;
this.thread = var1;
}
}
3.当前线程获取锁失败后的流程分析
锁的意义就是使竞争到锁对象的线程执行同步代码,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock时如何实现让线程等待并唤醒的呢?
- 在ReentrantLock.lock()方法中,会调用Sync的acquire()方法,接着调用AQS的tryAcquire 获取锁.
- tryAcquire方法需要子类(Sync)复写并实现,如果返回true说明获取锁成功,继续执行同步代码块,
- 如果tryAcquire方法返回false,也就是当前锁对象被其他线程所持有,会接着调用AQS的addWaiter和acquireQueued方法
3.1.AQS的addWaiter方法
当前线程获取锁失败后,会被添加到一个等待队列的末端,源码如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
//将线程以Node的方式添加到队列中,添加在tail节点后面
private AbstractQueuedSynchronizer.Node addWaiter(AQS.Node var1) {
// 把当前线程封装到一个新的Node对象中
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), var1);
AbstractQueuedSynchronizer.Node pred = this.tail;
// 将node插入队列
if (pred != null) {
node.prev = pred; //前继节点为tail
if (this.compareAndSetTail(pred, node)) {
//cas替换当前尾部,成功则返回true并设置tail的next后继节点为新节点
pred.next = node;
return node;
}
}
// 插入队列失败,进入enq自旋重试入队
this.enq(node);
return node;
}
//插入新节点到队列中,如果队列未初始化则先进行初始化,然后再插入
private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
while(true) {
AbstractQueuedSynchronizer.Node t = this.tail;
if (t == null) {
// 如果队列从未被初始化,则初始化一个头节点head,并赋值给tail
if (this.compareAndSetHead(new AbstractQueuedSynchronizer.Node())) {
this.tail = this.head;
}
} else {
// 将新节点node插入到tail节点的后面
node.prev = t;
if (this.compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}
- 有两种i情况会导致插入队列失败:
- tail为空:说明队列从未被初始化,因此需要调用enq方法再队列中插入一个空的Node
- compareAndSetTail失败:说明插入过程中有线程修改了此队列,因此需要调用enq将当前node重新插入到队列末端。
- 经过addWaiter方法之后,此时线程以Node方法被加入到队列的末端,但是线程还没有被执行阻塞操作,真正的阻塞操作在方法acquireQueued中判断执行
3.2.AQS的acquireQueued 方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
while(true) {
// 获取新节点的前继节点
AbstractQueuedSynchronizer.Node preNode = node.predecessor();
// 检测当前节点的前驱节点是否是头节点head,这是获取锁的资格
// 如果是头节点的话,则调用tryAcquire尝试获取锁
// 获取锁成功,则将头节点设置为当前节点
if (preNode == this.head && this.tryAcquire(arg)) {
this.setHead(node);
preNode.next = null;
failed = false;
boolean result = interrupted;
return result;
}
// 尝试获取锁失败,则根据前驱节点判断是否要阻塞
// 如果阻塞过程中被中断,则设置interrupted标志位为true
// shouldParkAfterFailedAcquire 方法在前驱节点状态不为SIGNAL的情况下都会循环重试获取锁
if (shouldParkAfterFailedAcquire(preNode, node) && this.parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
this.cancelAcquire(node);
}
}
}
}
- 解析
- 在acquireQueued方法中并不会立即挂起该节点中的线程,因此在插入节点的过程中,之前持有锁的线程可能已经执行完毕并释放锁,所以这里使用自旋再次去尝试获取锁。
- 如果自旋操作还是没有获取到锁,那么就将该线程挂起(阻塞)
// 根据前驱节点中的waitStatus 来判断是否需要阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点的状态是SIGNAL(-1),则返回true,表示需要将当前线程挂起
if (ws == -1) {
return true;
} else {
if (ws > 0) {
// 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点
// 当前线程之后会再次回到循环并尝试获取锁
do {
node.prev = pred = pred.prev;
} while(pred.waitStatus > 0);
pred.next = var1;
} else {
// 等待状态为0 或者 PROPAGATE(-3),设置前驱节点的等待状态为SIGNAL,
// 并且之后会回到循环再次重试获取锁
compareAndSetWaitStatus(pred, ws, -1);
}
return false;
}
}
- 解析:
- 在shouldParkAfterFailedAcquire 方法中判断当前线程是否应该被挂起,主要是通过前驱节点的waitStatus(等待状态)的值,Node中的等待状态一共有5种取值,代表的意义如下:
waitStatu值 | 描述 |
---|---|
CANCELLED(1) | 当前线程因为超时或者中断被取消。这时一个终结态,也就是状态到此为止 |
SIGNAL(-1) | 当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程,这个状态一般都是后继线程来设置前驱节点的 |
CONDITION(-2) | 当前线程在condition队列种 |
PROPAGATE(-3) | 用于将唤醒后的线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点称为头节点之前,是不会跃迁为此状态的 |
0 | 表示无锁状态 |
- shouldParkAfterFailedAcquire 方法中,根据waitStatus 不同的值进行不同的操作,主要有以下几种情况:
- 如果waitStatus等于SIGNAL,返回true,将当前线程挂起,等待后续唤醒操作即可
- 如果waitStatus大于0也就是CANCEL状态,会将此前驱节点从队列中删除,并在循环中逐步寻找下一个不是cancel状态的节点作为当前节点的前驱节点
- 如果waitStatus既不是SIGNAL也不是CANCEL,则将当前节点的前驱节点状态设置为SIGNAL,这样做的好处是下一次执行shouldParkAfterFailedAcquire 时可以直接返回true,挂起线程。
3.3.线程挂起操作
- 在acquireQueued方法中,如果shouldParkAfterFailedAcquire 方法返回true表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt 方法执行真正的阻塞线程代码
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public class LockSupport {
public static void park(Object var0) {
Thread var1 = Thread.currentThread();
setBlocker(var1, var0);
UNSAFE.park(false, 0L);
setBlocker(var1, (Object)null);
}
}
- 解析:
- 线程挂起方法比较简单,知识调用了LockSupport中的park方法,在park方法中调用Unsafe的native方法将线程挂起,该方法会执行操作系统中的线程挂起流程
获取锁流程总结:
- AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取锁
- 如果获取锁失败,通过addWaiter方法将线程构造成Node节点插入到同步队列队尾
- 在acquireQueued方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport中的native方法来实现线程阻塞。
4.释放锁操作
- 释放锁调用了ReentrantLock的unlock方法,该方法需要将上面获取锁失败的线程进行唤醒,并重新执行。具体AQS是何时尝试唤醒等待队列中被阻塞的线程呢?
public class ReentrantLock implements Lock, Serializable {
public void unlock() {
this.sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int arg) {
int newState = this.getState() - arg;
// 判断当前线程是否是持有锁的线程,不是则直接抛出异常
if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
} else {
boolean isRelease = false;
// 锁状态为0,表示当前尝试释放锁成功,接着执行真正的锁释放操作流程
if (newState == 0) {
isRelease = true;
this.setExclusiveOwnerThread((Thread)null);
}
this.setState(newState);
return isRelease;
}
}
}
}
public abstract class AbstractQueuedSynchronizer{
public final boolean release(int arg) {
if (this.tryRelease(arg)) {
AbstractQueuedSynchronizer.Node h = this.head;
if (h != null && h.waitStatus != 0) {
this.unparkSuccessor(h);
}
return true;
} else {
return false;
}
}
}
- 解析:
- 锁释放操作会调用AQS的release方法,首先会调用子类AQS的实现方法tryRelease尝试释放锁
- 如果返回成功则最终会调用AQS的unparkSuccessor方法来实现释放锁的操作。
public abstract class AbstractQueuedSynchronizer {
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
// 获取node的等待状态,其实node为头节点
int ws = node.waitStatus;
if (ws < 0) { // 1.将头节点的等待状态设置为0(无锁状态)
compareAndSetWaitStatus(node, ws, 0);
}
// 2.获取头节点的下一个节点
AbstractQueuedSynchronizer.Node nextNode = node.next;
if (nextNode == null || nextNode.waitStatus > 0) {
nextNode = null;
// 如果下一个节点为null,或者下个节点的状态为cancel(1),则找到队列中不是cancel的节点
// for循环从队尾开始遍历寻找,找到第一个waitStatus《=0的节点,并进行唤醒
for(Node t = this.tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
nextNode = t;
}
}
}
// 3.如果下一个节点不为空,而且等待状态《=0,则调用LockSupport的unpark方法唤醒线程
if (nextNode != null) {
LockSupport.unpark(nextNode.thread);
}
}
}
- 解析:
- 首先获取当前节点(实际上传入的是head节点)的状态,如果head节点的下一个节点是null,或者下一个节点的状态为cancel,则从等待队列的尾部开始遍历,直到寻找到第一个节点的waitStatus小于0的节点
- 如果最终遍历到的节点不为null,则调用LockSupport的unpark方法,调用底层方法唤醒线程。
网友评论