AQS框架最最最简单流程梳理
AQS是Lock的基础,之前一直想自己梳理一下,奈何一直没有静下心来梳理,这里记录并做梳理,文章基于JDK 1.8。
独占锁的上锁流程
AQS源码中给了我们一个最常见的互斥锁的代码示例:
public class MutexCustom implements Lock {
public static final int STATUS_ORIGIN = 0;
public static final int STATUS_TARGET = 1;
private Sync sync;
public MutexCustom() {
sync = new Sync();
}
public static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(STATUS_ORIGIN, STATUS_TARGET)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setState(STATUS_ORIGIN);
setExclusiveOwnerThread(null);
return true;
}
@Override
protected boolean isHeldExclusively() {return true;}
public Condition newCondition() {return new ConditionObject();}
public int getCurrState() {return getState();}
}
@Override
public void lock() {sync.acquire(STATUS_TARGET);}
@Override
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(STATUS_TARGET);}
@Override
public boolean tryLock() {return sync.tryAcquire(STATUS_TARGET);}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(STATUS_TARGET, unit.toNanos(time));
}
@Override
public void unlock() {sync.release(STATUS_TARGET);}
@Override
public Condition newCondition() {return sync.newCondition();}
public int getState() {return sync.getCurrState();}
}
我们先来关注最基础的最基础的最基础的锁的功能,也就是acquire获取锁
和release释放锁
acquire获取锁
public final void acquire(int arg) {
// Node.EXCLUSIVE为null
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里有三个主要的方法:
- tryAcquire尝试获取锁,得到true,也就是获取到锁直接返回,也就是不会走里面的方法体,直接执行临界区的代码了。
- tryAcquire返回false,执行addWaiter,顾名思义,把自己加到等待的nodeList最后面
- acquireQueued
首先明确锁是怎么生效的,如果这3个方法都很快的返回了结果,不管返回的是true还是false,都会继续往下执行,区别只不过是执不执行selfInterrupt()方法罢了,并没有影响当前线程继续往后执行后续方法,也就是临界区代码。
如果这三个方法里面有某一个方法没有很快的返回结果,而是把自己困在了一个while
或者for(;;)
循环里面,或者使用LockSupport.park()
方法,那么这个线程就阻塞住了,不能继续往后执行后续的临界区代码,那这个时候就实现了阻塞。
在合理的逻辑下竞争和释放这个阻塞的代码就实现了锁。
那究竟是在哪个方法里面阻塞住了呢?我们挨个方法去看:
1.1. tryAcquire
// 需要被重写,给出获取资源逻辑
// 比如最简单的互斥锁,只需要在state为0的时候compareAndSetState(0,1)成功返回true就好了。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这个没有什么好说的,既然AQS是实现Lock的基础框架,那肯定就要把逻辑代码空出来让开发者自己去实现,返回true表示竞争到了锁,可以执行临界区代码,反之没有竞争到锁,需要等待锁的释放并重新竞争。
1.2. addWaiter
private Node addWaiter(Node mode) {
// mode如果是Node.EXCLUSIVE则为null,so,node的next现在为null
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速插入到最尾端,也就是把新创建的node放到tail的next,然后用node取代tail。
// tail是全局变量,初始化为null
Node pred = tail;
// 前提是tail不为空
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果tail为空则直接走enq方法。
enq(node);
return node;
}
// 其实比快速插入不一样的就是如果没有tail,把tail赋值成head,然后再继续append
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 初始化状态tail为null,给head设置一个new Node(),同时把tail也指向这个head
// tail\head同为全局变量,初始化都为null
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
看着注释理一遍:
addWaiter
方法顾名思义,就是把自己的node插入到链表的最尾端,加入抢占资源的排队序列,等待抢占资源。
需要注意的是,tail / head
都是全局变量,都是初始化为null,刚开始运行的时候tail / head
都为null,如果tail为null,会走enq
方法,做了两件事
- tail为null,先把head初始化为一个
new Node()
,同时暂时把tail赋值为head。 - 把当前node赋值为head的next,同时当前node作为tail。
注意返回值是t,而不是当前node,也就是返回node的preNode,是t,上述情况下当前链表有两个值head->tail,返回的应该是是head而不是tail。当然这里调用enq
方法并没有使用它的返回值。addWaiter
方法返回值是当前node。
所以从空链表初始化结束后,链表里应该有两个节点,而不是只有一个head节点,保证了工作节点能获取到一个pre节点。
compareAndSetHead和compareAndSetTail这些CAS方法都不是阻塞的,都是Unsafe方法实现,所谓乐观锁,不用关心脏数据,只需关心成功或者失败。
1.3. acquireQueued
这里附上一个Node状态的表格:
类型 | 值 | 说明 |
---|---|---|
CANCELLED | 1 | 当前节点由于超时或者中断被取消,节点进入这个状态以后将保持不变。 注:这是唯一大于0的值,很多判断逻辑会用到这个特征 |
SIGNAL | -1 | 后继节点会依据此状态值来判断自己是否应该阻塞、当前节点的线程如果释放了同步状态或者被取消、会通知后继节点、后继节点会获取锁并执行(当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态) |
CONDITION | -2 | 当前节点正处在等待队列中,在条件达成前不能获取锁。 |
PROPAGATE | -3 | 当前节点获取到锁的信息需要传递给后继节点,共享锁模式使用该值 |
INITIAL | 0 | 节点初始状态,默认值,表示当前节点在sync同步阻塞队列中,等待着获取锁 |
// 放入等待队列
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果前一个节点是head并且这时获取到了临界资源,把当前node设置为head,head的next置为null
// 两个条件:
// 1. node是head的next节点,也就是轮到它了。
// 2. tryAcquire(arg)得到了资源,也就是获取到了锁。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 注意哈,这里是&&,也就是shouldParkAfterFailedAcquire方法返回true才会执行parkAndCheckInterrupt,两个方法同时返回true才会执行interrupted = true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 检测失败的时候是否需要阻塞自己,我要睡觉了,等人来叫醒我再检测是不是需要排队。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 注意,ws是上一个节点的status
int ws = pred.waitStatus;
// SINGAL: 此node的后续node通过park把阻塞住了,所以当前node在release或者取消acquire的时候必须unpark一下后续的node。
// (当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态)
if (ws == Node.SIGNAL)
// 前置节点已经设置了SIGNAL,可以阻塞我的线程了,后续检测到Node.SIGNAL会唤醒我的线程
return true;
if (ws > 0) {
// 前置节点被设置为cancel了,跳过pre,我来当pre, ^_^
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 检测到自己是INITAL或者PROPAGATE,把pre的status设置为SIGNAL,继续循环
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//park睡觉了,睡醒了之后检测是否需要中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
注释很清晰了,梳理一下:
acquireQueued
方法顾名思义,尝试获取锁,获取不到就进入队列等待。
- acquire处在一个for(;;)死循环中, 获取锁需要两个条件:
1.1. 前置节点是head,也就是轮到我自己获取锁了
1.2. tryAcquire(arg)==true,也就是竞争到了锁 - 没必要一直尝试获取锁,把自己标记为需要唤醒,同时调用park阻塞,告诉predecessor,需要的时候unpark我。
LockSupport.park(this);
之后当前线程会阻塞,直到有unpark操作才会继续往后执行。所以在for(;;)死循环中并没有一直尝试获取锁,在获取不到之后会阻塞自己,并把node的状态标记为SIGNAL
。后续看到node的状态为SIGNAL
,在自己的锁释放后会调用unpark唤醒自己继续执行for(;;)中没有执行完的部分,继续尝试tryAcquire获取锁,这时就能获取到锁并结束循环了。
这就是最基础的获取锁的功能。
release释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// head不为空,并且status不是初始状态,则需要唤醒unpark一下下一个节点。否则下个节点还没有阻塞,不需要unpark唤醒。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 需要unpark唤醒下一个节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 需要unpark的Thread被当前node的继任node持有,通常情况下是下一个node,但是有可能是cancel状态或者是null,
// 从尾部逆序往前遍历以找到状态不是canceled的继任者。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒下个节点
if (s != null)
LockSupport.unpark(s.thread);
}
释放锁的流程看起来就简单了一些。tryRelease
成功释放锁之后,找到下个节点,唤醒他。
至于为什么需要从后往前遍历而不是从前往后遍历:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//看这一块的代码
node.prev = t;
// ①
if (compareAndSetTail(t, node)) {
// ②
t.next = node;
return t;
}
}
}
}
这段代码是往tail插入新的next节点,使用CAS保证安全,如果当前线程执行到了②,但是这时有一个新线程执行到了①,新线程只执行了把node的pre指向了tail,这时tail的next仍然为null,这时tail变了再执行CAS就会失败了。
这种情况下从后往前遍历是能够找到的,所以是安全的。
参考链接:
https://www.zhihu.com/question/50724462?sort=created
https://blog.csdn.net/anlian523/article/details/106448512/
https://blog.csdn.net/java_lifeng/article/details/102482634
网友评论