AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
是什么,这么说吧,如果你想自己实现一个同步锁,那么基于AbstractQueuedSynchronizer
去做那就再方便不过了;所有要考虑的线程如何阻塞等待(类似于synchronized的等待队列)、独占锁、共享锁、如何唤醒等待的线程重新去竞争等逻辑都帮你封装好了,而你大致只需要去实现如何释放锁、获取锁的逻辑即可。
至于AbstractQueuedSynchronizer
后续我们简称AQS
简而言之,AbstractQueuedSynchronizer
就是一个队列同步器,内部维护了一个共享变量(int state)和等待线程队列(Node链表)帮助实现者去自定义自己的同步工具,而我们具体要做的就只需要实现其指定的几个方法即可:
方法 | 描叙 |
---|---|
protected boolean tryAcquire(int arg) | 独占式的获取同步状态,成功返回true,否则false |
protected boolean tryRelease(int arg) | 独占式的释放同步状态,成功返回true,否则false |
protected int tryAcquireShared(int arg) | 共享式的获取同步状态,成功返回true,否则false |
protected boolean tryReleaseShared(int arg) | 共享式的释放同步状态,释放成功返回true,否则false |
protected boolean isHeldExclusively() | 是否已经独占式获取到同步状态 |
我们只需要实现以部分上方法即可实现自己的同步锁了,而内部如何进入等待、如何唤醒完全由AQS
自己完成,所以整体是非常方便的。
源码分析
我们先从一个小的demo代码段入手
public class MySyncLock extends AbstractQueuedSynchronizer {
// 直接设置为1
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
// 直接设置为0
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
// 直接判断是否等于1
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 获取锁
public boolean lock(){
// 真正获取锁
this.acquire(1);
}
// 释放锁
public boolean unlock(){
// 真正释放锁
this.release(1);
}
}
上面的代码是不是非常简单,这个时候去获取同步锁的时候,直接调用lock或者unlock就可以了。
在进入分析之前,这里提前简要介绍一下AQS里的两个重要结构
private volatile int state; // 同步变量(线程竞争的就是它)
private transient volatile Node head; // 指向等待队列的头节点
private transient volatile Node tail; // 指向等待队列的尾节点
...
static final class Node {
static final Node SHARED = new Node();// 共享
static final Node EXCLUSIVE = null; // 独占
static final int CANCELLED = 1; // 节点对应的线程已经被取消了
static final int SIGNAL = -1; // 表示后边的节点对应的线程处于等待状态
static final int CONDITION = -2; // 表示节点在等待队列中
static final int PROPAGATE = -3; // 表示下一次共享式同步状态获取将被无条件的传播下去
volatile int waitStatus; // 当前状态
volatile Node prev; // 上一个Node
volatile Node next; // 下一个Node
volatile Thread thread; // 绑定Node的线程
Node nextWaiter; // 下一个需要唤醒的Node
}
获取锁
接下来看一下acquire(int arg) 的代码
public final void acquire(int arg) {
// 1.限制性用户自定义的同步逻辑,比如上面demo中的compareAndSetState(0, 1);
// 2.若tryAcquire没成功,则执行addWaiter逻辑进行入队
// 3.再次尝试执行同步逻辑,若依然没有执行成功,则进行阻塞
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 1)tryAcquire(arg)尝试执行同步逻辑,
- 2)若tryAcquire不成功则执行addWaiter(构造Node进行入队)
- 3)再执行acquireQueued逻辑(再次尝试执行同步逻辑,若失败则进入阻塞等待)
- 4)最后执行selfInterrupt阻塞等待(上面确定同步逻辑执行返回false后,就进入阻塞等待了)
addWaiter具体逻辑-加入队列
private Node addWaiter(Node mode) {
// 根据当前线程及执行模式构造一个Node节点
Node node = new Node(Thread.currentThread(), mode);
// 判断当前队列是否为空,若为空,则执行enq逻辑(不用想肯定是初始化队列)
Node pred = tail;
// 若队列不为空,则CAS将自身插入队列尾部(线程安全的哦)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若不为空则执行enq逻辑(初始化队列并执行插入)
enq(node);
return node;
}
enq(初始化队列并执行插入)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 若此时还没有节点,则直接初始化,构造一个空的头节点(头尾指向这个空节点)
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
}
// CAS再次循环将当前node插入尾部。
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued具体逻辑
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 中断标识,因为下面还要尝试获取一次锁
for (;;) {
final Node p = node.predecessor(); // 获取上一个节点
if (p == head && tryAcquire(arg)) { // 如果上一个节点就是头节点,那再次尝试获取下锁,也省的进入中断等待状态
setHead(node); // 如果获取成功,则将自己设置为头节点
p.next = null; // help GC
failed = false;
return interrupted; // 返回false,这样在开始逻辑里就不会执行终端了
}
// 到这里就真的要等待了,返回中断标识
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
/**
* 这段逻辑就是判断是否需要中断,同时这个过程也会移除waitStatus>0的一些节点
* 1.首先检查上一个节点的waitStatus是否等于-1(前面讲过-1代表这个节点后面的节点都是等待中)
* 2.如果上一个节点的waitStatus>0(前面也讲过,大于0代表节点已经取消),则移除
* 3.不断检测上个节点的waitStatus,直到waitStatus==-1,否则一直移除
* 4.将上个节点状态置为-1
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
/** 逻辑很简单,就是调用LockSupport进行等待,再往下就是调用UNSAFE的逻辑了 */
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
以上都执行完之后,就执行acquire(int arg)中的selfInterrupt()逻辑了,进行线程中断标识设置。
以上代码注释都很清晰,那下面进行一个总结:
1)acquire尝试获取锁,这里会调用用户自定义同步逻辑tryAcquire进行处理
2)如果tryAcquire失败,就代表其他线程已经将同步状态state占有,那么进行节点构造并加入等待队列
3)加入等待队列的同时,判断如果当前队里就一个节点(即上一个节点是头节点),那再尝试CAS获取state状态
4)如果再次获取失败,则设置中断状态进行中断
5)否则获取同步状态(设置state)成功,获得锁成功
OK,接下来分析一下获取锁的过程
图解一
假如此时队列还是空
-
1)线程1获取锁,如果直接tryAcquire成功了,就没有入队啥事了。
AQS2
-
2)此时线程2也尝试去获取锁,此时状态已被线程1占有,因此无法获取锁,进入队列。
-
2.1 先初始化队列(在enq方法中进行)
AQS3
-
2.2 插入自身节点
AQS4
-
-
3)此时线程3来获取锁,那么队列如下:
AQS-队列.png
[图片上传失败...(image-7589be-1599813925954)]
注意这里waitStatus状态的变化,waitStatus变为-1,代表他的下一个节点处于等待状态。
就是这样一个队列,将没有获取到锁的线程进行了等待阻塞维护,同时当前获取到锁的线程在释放锁的时候,会进行下一个节点的唤醒,让其去竞争获取锁。
acquireInterruptibly
和acquire
的区别是它是可中断的,也就是说在一个线程调用acquireInterruptibly而发生阻塞之后,别的线程可以对它进行中断,则acquireInterruptibly方法会抛出InterruptedException异常并返回。
tryAcquireNanos
也是支持中断的,只不过多了一个超时机制而已。
释放锁
既然有获取锁,当然就要释放锁了,对于线程节点来说,那就是移除当前节点,通知下一个节点成为头节点。
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
// 释放锁
public boolean unlock(){
// 真正释放锁
this.release(1);
}
release(int arg) AQS中的逻辑
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
逻辑很简单,先调用用户自定义的释放同步状态逻辑,若没成功,直接返回false,否则进一步处理(返回true),此时如果head节点不为空且状态不为0 ,则进行unparkSuccessor(head);
其实这里应该能想到,当前head节点就是在释放锁的节点,那么如果waitStatus=-1的话,就代表它后面还有节点在等待,因此就需要它尝试去唤醒对方了,所以unparkSuccessor可想而知就是干这事的。
其实这里head节点有没有可能大于0呢?其实不可能的,因为大于0代表被取消,在每次入队的过程中都会进行移除的,前面的入队逻辑有讲。
那我们来看看unparkSuccessor的逻辑:
unparkSuccessor(head)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 设置当前节点状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取下一个节点(正常来讲就是要唤醒的节点)
Node s = node.next;
// 总有些意外,如果下一个节点正好状态被取消了呢~,那就从尾部反过来遍历吧
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部开始遍历,找到离head节点最近的那个节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; // 找到了
}
// 找到了找到离head节点最近的那个节点,进行唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
上面的逻辑正如上面所述,找到下一个状态waitStatus<0的节点,进行唤醒,注意:这里的下一个节点
不一定是next指向的节点哦。
同时要干的一件事就是将下一个节点设置为头节点,移除当前头节点。那么现在队列就成这样了:

注意:这里好像release过程没有看到重新设置head节点,并且怎么头节点的thread属性变为null了。
这个过程其实很好理解,再回到入队是的逻辑:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面逻辑里有一个parkAndCheckInterrupt()
的方法,那么线程就是在这被unsafe阻塞
的,既然是唤醒了,那么该唤醒的线程会继续执行这个for循环
,继续tryAcquire(arg),这里会发现有个setHead(node)
方法:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
那这里就很清楚,下一个节点被唤醒,在尝试重新去获取state状态时,设置了head为自己,同时将thread、pre等属性置为null。完成了释放唤醒、队列的变更。
以上就是独占方式的讲解
引申
那么共享方式锁只需要与独占方式作出区别
就很好理解了。我们知道AQS的主要的核心是state
状态的竞争,独占只需要将state资源保持一个,控制只要同时只有一个线程能够获取state即可。而共享就是将state设置为多个,利用消耗机制,来控制共享线程数量即可。
典型的CountDownLatch
内部实现逻辑如下,就是判断当前state是否被消耗完了。这里不在做详细介绍。
Sync(int count) {
setState(count);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
网友评论