0. 本文目标
本文旨在记录笔者阅读jdk1.8 AQS源码的过程,java.util.concurrent是java的并发包,包含两个包atomic和locks,这两者构成并发框架的基石,前者存放以cas为基础的实现类,后者存放aqs的实现基类。AQS是一个构建锁和同步器的框架,许多同步器都可以通过AQS很容易的构建出来。并发包中的高频使用类如ReentrantLock、Semaphore、CoutDownLatch等均基于AQS 实现的。
1. AQS数据结构
AQS全称是AbstractQueuedSynchronizer,是个抽象类。内部定义一个Node的数据结构,实际上是FIFO的线程等待队列,Node是基于CLH原理的锁队列,又有改进。CLH通常用于自旋锁,在AQS中用于阻塞线程,线程的状态status代表线程是否阻塞,如果前驱节点释放锁,则后继节点会被发信号即将准备运行。
CLH的数据结构如下:
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
CLH告诉我们,从尾节点加入队列,从头部节点出列。且入列、出列都是原子的。线程没有成功则会进入自旋直到成功为止。
AQS并不是照搬CLH的数据结构,在每个节点又加了next指针,为了实现阻塞机制。每个Node都持有线程id,因此前驱节点发出信号唤醒后继节点就是通过next指针。
Node数据结构:
node数据结构
AQS维护一个状态volatile int state(代表共享资源)和如上图的Node数据结构,就是线程等待队列,当线程争夺资源失败时,则入列。
不同的同步器争用资源的方式也不同。自定义同步器只需要实现共享资源state的获取与释放即可。至于具体线程等待队列的操作,如出列、入列等,AQS已经实现好了。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
Node状态有四种。AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。
-
CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
-
SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
-
CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
-
PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
0状态:值为0,代表初始化状态。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
Node属性释义,Node定义的是线程等待队列。waitStatus、prev、next、thread代表的意思显而易见。着重说说nextWaiter,指在condition上等待的节点或者是SHARED节点。由于condition队列是独占模式,因此我们需要简单的队列来持有节点。
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
2.源码解析
AQS中大量私有方法,对外提供的核心方法是acquire-release和acquireShared-acquireRelease。依次来说。
2.1 acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法流程如下:
- 步骤1,tryAcquire: 尝试以独占方式获取资源,在AQS中是空方法,子类要重写该方法。通常state代表资源。不同锁中释义不同。获取到资源后,返回true,唤醒线程;反之,返回false,转入下个流程。
- 步骤2,addWaiter: 将线程加入队列尾部,并标记为独占模式。
- 步骤3,acquireQueued: 让线程在等待队列中获取资源,直到获取到资源才返回;如果等待过程中被中断过,则返回true,否则,返回false。
- 步骤4, 步骤3线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
下面直接上源码详解:
2.1.1 tryAcquire
tryAcuire在AQS中是空方法,子类要重写该方法,这里没有定义成abstract方法,原因在于独占锁重写tryAcquire和tryRelease,共享锁重写tryAcquireShared和tryReleaseShared,如果定义成abstract,之类要实现四个方法。Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
2.1.2 addWaiter
加入独占线程节点到对列尾巴上。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试快读加入队列,成功则返回新节点node,失败,则采用自旋加入节点知道成功返回该节点。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.1.3 enq
自旋加入新节点,看源码释义。
private Node enq(final Node node) {
// 整个自旋,直到加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空,则创建head节点并将该节点设置为tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列存在尾节点,则把node的prev指针指向尾节点tail
node.prev = t;
// CAS重新设置tail节点
if (compareAndSetTail(t, node)) {
// 原尾节点的next指针指向新节点node,node就是新的为节点tail,自旋结束,返回原尾节点。
t.next = node;
return t;
}
}
}
}
2.1.4 acquireQueued
节点加入为节点成功后,尝试在等待队列中自旋获取资源。源码释义如下。
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
// 又是自旋,直到node的前驱节点称为头节点,且头结点已释放锁;
// node节点尝试获取资源成功,则node成功头节点。
// 否则线程节点自旋,自旋过程中,有可能中断,是否中断的标记。
// 疑问,标记failed如何生效呢,只有一个return,节点一直在自旋中如何退出呢。
for (;;) {
final Node p = node.predecessor();
// 如果node前驱节点是head节点,下一个就轮到node啦,则node有机会获取资源。
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。
setHead(node);
p.next = null; // help GC ? ***这句尚有疑问***
failed = false;
return interrupted;
}
// 还不具备获取资源条件,说明可以洗洗睡了,如果具备条件,就wait,知道unpark唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.1.5 shouldParkAfterFailedAcquire
线程获取资源失败后,判断是否阻塞线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获得前驱节点的状态,根据前驱节点的状态判断是否需要休息,即阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点已经被设置为SIGNAL,即前驱节点释放锁后,马上唤醒后继节点node,那作为后继节点
* 就放心了,可以洗洗睡了,不用自旋,等通知就好了
*/
return true;
if (ws > 0) {
/*
* 状态>0,说明前驱线程节点被撤销,跳过所有的被撤销的prev节点,排在它后面
* 将上一个小于0的节点设置为node前驱节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 若前驱节点为 0 或者 PROPAGATE. 就要唤醒前驱节点,并设置为SIGNAL
* 告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2.1.6 parkAndCheckInterrupt
阻塞线程节点,判断线程是否中断。
另外,park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
private final boolean parkAndCheckInterrupt() {
//调用park()使线程进入waiting状态
LockSupport.park(this);
return Thread.interrupted();
}
2.1.7 小结
看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到acquireQueued(),总结下该方法的流程:
- 节点加入队尾后,检查状态,找到安全休息点。
- 调用park进入waiting状态,等待unpark或者interrupt唤醒自己。
-
唤醒后看看自己有没有资格获取资源,如果成功获取,则head指向当前节点,并返回从入队到拿到号的过程中有没有被中断,如果没有拿到号,继续步骤1。
入队自旋拿号.png
2.2 release
release是释放独占资源,它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。此方法可以用来实现unlock方法。
2.2.1 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
逻辑很简单,判断线程是否彻底释放共享资源state,同步器要实现具体的释放方法tryRelease
2.2.2 tryRelease
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2.2.3 unparkSuccessor
此方法用于唤醒等待队列中下一个线程。
private void unparkSuccessor(Node node) {
/*
* 当前节点的状态
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); //置零当前线程所在的结点状态,允许失败。意义在哪?????
/*
* statue<=0的都是有效节点,唤醒后继节点,通常是node的next指针的对应的节点,
* 但是碰到null节点或者撤销节点,要重tail遍历往回遍历,找到node最近的节点,并唤醒它。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!
2.2.4 小结
release是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
2.3 acquireShared
此方法是共享模式下线程获取资源的顶层入口。获取成功则直接返回,失败则进入等待队列,并自旋知道获取资源为止。
public final void acquireShared(int arg) {
// 该方法只有一个分支,判断是否能获取到共享资源,能够获取到则返回正整数,否则,返回负整数
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
2.3.1 tryAcquireShared
由不同语义的不同锁实现,AQS中是个空方法保留给子类实现。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
比如CountDownLatch,用state==0表示共享资源的状态。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
2.3.2 doAcquireShared
doAcquireShared表示加入等待队列。
private void doAcquireShared(int arg) {
// 同样是自旋加入队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 又是自旋
final Node p = node.predecessor();
// 判断节点的前驱节点是否为头节点,如果是,说明下一次就轮到我啦
if (p == head) {
// 尝试获取共享资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// //将head指向自己,还有剩余资源可以再唤醒之后的线程,关键看此方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
// //如果等待过程中被打断过,此时将中断补上。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。
跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。
2.3.2 setHeadAndPropagate
成为头节点,唤醒后继节点。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// head指向node
setHead(node);
/*
* 唤醒满足条件的后继节点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
2.3.3 doReleaseShared
又是自旋释放满足条件的后继节点
private void doReleaseShared() {
/*
* 自旋释放后继节点
*/
for (;;) {
// 自旋将动态的head赋值给变量h
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 变量h有唤醒状态比较然后替换成初始状态,直到成功,则替换后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 自旋跳出条件,head不变则跳出自旋,head变化则一直自旋
if (h == head)
break;
}
}
2.3.4 小结
acquireShared源码解读告一段落。让我们温故一下流程。
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。
2.4 releaseShared
releaseShared释放共享锁,是共享模式释放锁的顶层入口。它会释放指定的共享资源,如果释放成功就会去唤醒等待线程。
public final boolean releaseShared(int arg) {
// 尝试释放共享资源,
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.4.1 tryReleaseShared
同样是空方法,留给子类重写。返回布尔值,true代表完全释放资源,可以走到分支中通知等待线程队列了。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
下面看CountDownLatch的tryReleaseShared源码,就是自旋state减1,state==0则完全释放锁返回true。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2.4.2 doReleaseShared
尝试释放资源失败后,再次尝试释放资源并唤醒后继节点,同上。
2.5 总结
本文介绍了独占和共享两种模式的获取-释放资源方式,共享和独占整体流程差别不大,在于唤醒后继节点的条件,独占锁只能在head释放锁后唤醒后继节点,共享锁能够唤醒满足条件所有后继节点。
网友评论