前言
AQS(AbstractQueuedSynchronizer)
是一个抽象类,预定义了一些需要由我们自己实现的方法,用来构建自定义的同步工具。
抽象类同接口一样,都无法利用new语句创建对象实例,它存在意义在于预先帮我们实现好了一些方法,并暴露出一些抽象方法由子类实现,本质上相当于一个模版。
先看一下AbstractQueuedSynchronizer
类里有哪些方法需要我们自己实现。
//独占模式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// 共享模式
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
上述五种方法,虽然没有明确要求子类实现,但是可以看到其方法内部没有任何判断逻辑,而是直接抛出了一个UnsupportedOperationException()
的异常,表示不支持该操作,显然当我们实现定制化同步工具时,这5
个方法就是关键所在。而这5
个方法又可以分为独占模式
和共享模式两大类
。
独占模式,比如互斥锁,一个锁在一个时刻只能由一个线程占用,其他线程必须阻塞等待该线程释放锁。而共享模式,比如读锁,可以由多个线程共同持有。
这里我们介绍的是独占模式,也就是前三个方法,我们先来分析一下这三个方法分别代表的含义。
方法名 | 作用 |
---|---|
boolean tryAcquire(int arg) | 尝试去获取同步状态 |
boolean tryRelease(int arg) | 尝试去释放同步状态 |
boolean isHeldExclusively | 判断当前线程有无获取到同步状态 |
这里先解释一下,什么是同步状态。在AQS
类内部,给我们定义了一个私有的类变量state
private volatile int state;
这个state
变量由volatile
关键词修饰,所以保证了其在多个线程之间的可见性。在独占模式下,我们可以定义这个state
只有0
和1
两个状态, 假设0
表示已有线程进入同步代码块,则其他线程阻塞等待,假设1
表示当前没有线程进入同步代码块,则当前线程可以尝试进入。那么当一个线程想进入同步代码块时,会判断state
是否是0
,若是则修改其为1
,表示自己已经成功进入同步代码块,其他线程需要等待,这个过程就是tryAcquire
。而当已经在同步代码块内部的线程准备离开时,必须要修改当前state
由1
变为0
,这个过程就是tryRelease
。
理解了这个是不是就很容易说自己实现tryAcquire
方法呢?比如如下代码。
//错误的实现
@Override
protected boolean tryAcquire(int arg) {
if (state == 0) {
state = 1;
return true;
} else {
return false;
}
}
显然在并发场景下这里会发生错误,比如线程A
和线程B
同时判断state==0
为true
,然后都修改state = 1
,此时线程A
和B
都进入同步代码块,这显然和我们独占模式的设计冲突了。这是一个经典的比较并更新的场景,需要我们保证这两个操作的原子性。AQS
内部给我们提供了几个访问和设置state
值的方法。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setState(int newState) {
state = newState;
}
protected final int getState() {
return state;
}
可以看到第一个compareAndSetState
方法利用CAS
保证了原子性,而第二个setState
则没有,setState
方法可以用在我们之前所说的tryRelease
方法里,因为独占模式下同步代码块内部同一时刻最多只存在一个线程,所以不会发生并发错误,只要简单的通过setState
方法将state
的值由1
设置为0
就可以了。所以正确的tryAcquire
实现如下:
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
现在只要我们实现了自己的tryAcquire
方法和tryRelease
方法逻辑,就可以自定义一个同步工具了,当一个线程tryAcquire
成功时,就会发生阻塞。当一个线程tryRelease
成功时,就离开了同步代码块。AQS
已经帮我们实现了诸如阻塞等待等的复杂方法,tryAcquire
和tryRelease
只是它们判断逻辑中小小的一环而已,下面看一下AQS
的具体执行流程。
acquire()
tryAcquire
方法的注释里有一段话,This method is always invoked by the thread performing acquire. If this method reports failure, the acquire method may queue the thread.
说的是tryAcquire
都会被acquire
执行,并且如果返回false
,就有可能把当前的请求线程加入到一个等待队列中去,直到被唤醒。我们先看下这个等待队列是什么样的。
- 等待队列的节点类
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS
内部定义了一个静态内部类Node
,也就是这个等待队列的节点,从Node
的定义上可以看出,这个等待队列实际上是一个双向链表,链表中的每一个节点代表了对应的Thread
,同时还给每个Node
设置了waitStatus
状态(之后细说)。
AQS
内部定义指向这个队列的头节点和尾节点的引用,同样用volatile
修饰,保证多线程这间的可见性。
private transient volatile Node head;
private transient volatile Node tail;
之前说了,我们自己实现的方法tryAcquire
只是acquire
执行逻辑之的一环,我们来看看acquire
具体是如何执行,并使用这个双向链表结构的队列的。
-
acquire
方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
当tryAcquire
失败时,说明此时已有其他线程在同步代码块内部,需要等待,于是执行addWaiter
方法。
-
addWaiter
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter
方法的功能是创建一个代表当前线程的Node
加入到等待队列中去,其执行逻辑为
- 为当前进入同步代码块失败的线程,创建一个对应的
Node
实例,这里的mode
有两种Node.EXCLUSIVE
和Node.SHARED
,在独占模式下也就是选择Node.EXCLUSIVE
。- 拿到当前等待队列的尾节点
tail
,如果当前等待队列不为空且执行CAS
操作插入成功,则该节点成为新的tail
,则返回。- 否则调用
enq
方法进行插入,并返回。
值得关注是,若当前队列不为空,这个插入操作是需要通过CAS
来完成的,否则可能造成并发错误,使得多个节点都误以为自己插入成功,实际上最终真正插入成功的只有一个,从而造成一些节点丢失的现象。在这里,多个线程在第一步都会把自己的prev
指向当前队列的尾部,然后通过CAS,使得只有一个节点成为新的tail
节点,其余的线程会再去执行enq
方法。
然后再看下未通过CAS成功插入的节点会发生什么。
-
enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq
方法整体上采用了一个无限循环结构,也就是不断的尝试插入,直到成功为止。
- 判断当前队列是否为空,如果为空则插入一个新节点作为头部,该节点不表示任何线程,具体作用之后会提到。
- 若当前队列不为空,采用和之前
addWaiter
里一样的方法,执行CAS
插入,不成功则重试,直到成功为止。
当一个代表特定线程的Node
被加入到队列中(尾部)之后,acquire
会接着调用acquireQueued
方法,acquireQueued
方法具体实现如下。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
内部实现整体上还是一个无限循环,具体执行流程如下
- 拿到之前加入队列的节点的前驱节点
- 判断这个前驱节点是不是
head
节点,(注意我们之前提到过head
节点是不指向任何线程的 ), 如果是head
节点,那么说明当前节点排在等待队列的队首,即有可能马上就会被唤醒,所以阻塞该线程之前,再尝试一下看看能否进入同步代码块。- 如果运气好,
tryAcquire
成功,那么这个等待的线程说明可以进入同步代码块了,下面要做的就是把该节点设为新的头节点(因为这个节点已经没用了)。并且把之前的头节点的next
指针设置成null
,来让GC
把它回收掉。- 如果不能不满足上述条件,说明当前线程需要阻塞等待了,但在真正进入阻塞等待状态(调用
parkAndCheckInterrupt
方法)之前,还会再进行一次判断,是不是应该wait
, 如果不是,再重新执行循环流程。这个再判断的方法就是shouldParkAfterFailedAcquire
。
-
shouldParkAfterFailedAcquire
方法的功能就如同它的名字一样,判断在tryAcquire之后,是否要进行阻塞等待状态
。该方法传入的参数是当前节点node
的其前驱节点pred
。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0){
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这里出现了我们之前没有介绍的waitStatus
,Node
节点内部定义了几种Status
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
CANCELLED = 1
表示当前线程已经被取消,可以忽略并把它移出队列了,SIGNAL = -1
表示当前节点的后继结点
需要被唤醒, condition
表示该线程处于条件等待状态。waitStatus
的初始状态为0
。基于此我们再来看shouldParkAfterFailedAcquire
的执行流程。
- 获取当前节点其前驱节点的
waitStatus
,如果waitStatus== node.SIGNAL
,说明其后继节点(也就是当前节点)处于一个等待被唤醒的状态
,也就是说处于阻塞等待状态,因此返回true
。- 如果
ws > 0
也就是其处于CANCELLED
状态,因当从队列中移除,于是利用node.prev = pred = pred.prev;
移除所有CANCELLED
节点。- 否则,利用CAS操作将前驱节点的
waitSatus
设置为SINGNAL
, 表示其后继节点应当处于阻塞且需要被唤醒状态,返回false
,当处于外层的循环下一次执行事,就会返回true
并开始阻塞。
也就是说shouldParkAfterFailedAcquire
,除了给当前节点赋予需要被唤醒状态的同时(通过设置前驱节点的waitStatus
来完成),还复杂移除当前节点前驱中的代表已经被取消的线程的节点。
release()
相较于acquire
,release
方法的逻辑要简单很多
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 首先判断是否能够成功释放同步状态,若可以,则获取当前队列的头节点,否则返回false。
- 获得当前队列的头节点, 如果当前队列不为空,且其
waitStatus != 0
,说明其后继节点所关联的可能是一个处于阻塞状态需要被唤醒的线程,于是调用unparkSuccessor
唤醒它,这里体现了第一个不代表任何线程的头节点的作用。- 无论是否成功唤醒一个等待中的线程,最终都会释放同步状态,返回true。
再来看下unparkSuccessor
方法做了什么。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 首先如果当前节点的
waitStatus<0
,则将其清0,因为它的后继节点所代表的线程会被唤醒。- 找到要被唤醒的线程,如果要被唤醒的线程被取消或者为null,则反向从队尾开始找到离队首最近的未被取消的线程节点。
3.如果线程存在,则调用LockSupport.unpark(s.thread)
方法,将其从阻塞状态中恢复。
应用
ReentrantLock
是JUC
包里提供的一个支持公平/非公平的可重入锁,其实际就是基于AQS
构建的一个同步工具。
- ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
....
}
static final class NonfairSync extends Sync {
....
}
static final class FairSync extends Sync {
....
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
....
}
其内部包含一个继承于 AQS
的抽象子类sync
,并在sync
的基础上实现了NonfairSync
非公平锁和FairSync
公平锁来实现对应的功能。
下面让我们也基于AQS
,模仿ReentrantLock
中的写法,来实现自己的一个互斥锁。
public class MyLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private Sync sync;
public MyLock(){
sync = new Sync();
}
public void lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
}
网友评论