之前在看ReentrantLock的源码时,发现它的内部使用AbstractQueuedSynchronizer类实现同步,但是当时对AbstractQueuedSynchronizer理解不了,主要是单纯的看源代码容器懵逼。最近在《Java 并发编程的艺术》书,书上对AbstractQueuedSynchronizer做了详细的介绍,从而帮助自己理解了这个类。今天,在这里将我对AbstractQueuedSynchronizer的理解记录下来。
本文主要参考资料:
1.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》
1.AbstractQueuedSynchronizer的介绍
队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量state来表示同步状态,通过内置的队列来完成线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并且实现它的抽象方法来管理同步状态。同自定义组件里面,子类推荐被定义为静态内部类,这一点我们可以从ReentrantLock里面可以看出来。
简而言之,如果想要定义一个类似于ReentrantLock的锁,在锁的内部,我们可以聚合AbstractQueuedSynchronizer,关于锁的获取锁和释放锁,都由AbstractQueuedSynchronizer来真正操作。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式。
2.使用AbstractQueuedSynchronizer定义一个简易的锁
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public Condition newCondition(){
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
上面就是我们使用AbstractQueuedSynchronizer来实现的一个同步组件。首先,我们会发现,在这个自定义的同步组件中,定义了一个静态内部类Sync,继承于AbstractQueuedSynchronizer,然后在里面实现了一些方法。在这里实现的方法中,是真正操作state字段。前面也说了,在AbstractQueuedSynchronizer的内部使用了一个state来表示同步状态。
同时我们还发现Mutex实现了Lock接口,实现了Lock接口里面的方法。这些方法相当于是对外公开的,面向使用者的。当程序员使用这个自定义同步组件时,通过调用这个方法来达到线程同步的目的。
3.AbstractQueuedSynchronizer的分析
接下来我们将分析AbstractQueuedSynchronizer是如何完成线程同步的,其中包括:同步队列、独占式同步状态的获取和释放、共享式同步状态的获取和释放以及超时获取同步状态等方面。
(1).Node
在说同步队列之前,我们先说一下Node这个类,因为在同步队列里面,存储每个元素就是Node元素,所以对Node这个类必须有一定的了解。
首先,我们来看看Node节点有哪些的属性。
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
我们从Node的成员变量里面,我们可以看到同步队列是一个双向队列。其中next表示队列的下一个节点,而prev表示队列的上一个节点。
waitStatus字段表示的是当前的Node的状态,这个包含四种状态,分别是:CANCELLED,SIGNAL,CONDITION,PROPAGATE,INITIAL。详细解释如下:
状态 | 描述 |
---|---|
CANCELLED | 值为1,由于同步队列中等待的线程等待超时或者是被中断,需要从同步队列中取消等待,节点进入该状态不在变换。 |
SIGNAL | 值为-1,后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。 |
CONDITION | 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal之后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取。 |
PROPAGATE | 值为-3,表示下一次共享式同步状态的获取将会无条件的传播下去。 |
INITIAL | 值为0,初始状态。 |
Node内部封装的信息差不多就是这些,现在我们已经对Node已经有了一个大概的了解,现在准备讲解一下同步队列了。
(2).同步队列
我们知道,AbstractQueuedSynchronizer内部实现了一个同步队列来完成线程同步的排队。从AbstractQueuedSynchronizer的源码,我们可以看出,AbstractQueuedSynchronizer内部有两个成员变量用来这个队列的队头和队尾。
AbstractQueuedSynchronizer的tail属性表示这个同步队列的队尾,head表示这个同步队列的队头。然后,我们可以根据Node的信息绘制出同步队列的结构图:
AbstractQueuedSynchronizer内部的两个节点引用,一个指向头结点,一个指向尾节点。当一个线程获取同步状态,其他线程将无法获取到同步状态,转而被封装成为一个节点并且加入到同步队列中。在加入这个队列时,必须保证线程安全,因为获取不到同步状态的线程肯定有多个,所以AbstractQueuedSynchronizer提供了一个基于CAS方式设置节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的的尾节点和当前节点,只有设置成功后,当前这个线程所在的这个节点才算是成功加入到同步队列里面去。
而设置头结点则不需要保证线程安全,因为只有一个线程在操作setHead方法。为什么只有一个线程在设置头结点呢?首先,我们得理解一个东西,在同步队列里面头结点表示成功获取同步状态的线程所在的节点;在设置新的头结点时,表示当前有一个线程已经成功获取到了同步状态了,其中都被阻塞了,所以只有一个线程在设置头结点,不需要保证线程安全。
(3).独占式同步状态的获取和释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入了同步队列中,后续对这个线程进行中断操作时,该线程不会从同步队列中移出。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的acquire方法里面主要进行三步操作:
1.调用tryAcquire方法来尝试获取同步状态。
2.调用addWaiter方法将该节点添加到同步队列的尾部。
3.调用acquireQueued方法使得该节点以“死循环”的方式来获取同步状态。
其中,调用的tryAcquire方法实际上是我们在Sync里面重写的tryAcquire方法,如下:
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
在tryAcquire方法里面先尝试将state更新为1,如果更新失败的话,返回false;反之,则表示当前的线程获取同步状态,即获到了锁。
在acquire方法里面,我们可以看到如果尝试获取同步状态失败的话,会调用addWaiter方法,我们来看看addWaiter方法究竟为我们做了什么。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//尝试快速的将当前的Node添加到同步队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果上面的操作失败了,将会采取无限循环的操作来保证肯定添加成功
enq(node);
return node;
}
在addWaiter方法里面,我们知道,首先会尝试快速的将Node添加同步队列中去;如果快速添加失败的话,会调用enq方法来去保证肯定添加成功。
当成功将Node添加到同步队列中去之后,会调用acquireQueued方法去不断的尝试获取同步状态。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acuqireQueued方法里面,当前线程在”死循环“中不断的尝试获取同步状态,并且只有前驱节点为头结点才能够尝试获取同步状态。也就是说,头结点释放了同步状态,首先准备获取同步状态的是头结点的后继节点。
如果一个节点获取同步状态失败或者不符合获取状态的条件,就会进入parkAndCheckInterrupt方法里面进行进行等待。
当一个线程执行完毕自己的操作之后,需要释放同步状态,会调用了release方法来释放同步状态。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法的基本步骤也是非常的简单,先是tryRelease方法去尝试释放同步状态,如果释放的成功的话,会调用unparkSuccessor方法去唤醒头结点的后继节点。
(4).共享式同步状态的获取和释放
说完了独占式同步状态的获取和释放,相信大家对它应该有了一个大概的理解。现在我们对共享式同步状态的获取和释放进行简单的分析。
我们先举一个简单的例子,什么是共享式。假设有一个文件,一个进程在对这个这个文件进行读取操作的,那么其他进程只能对这个文件进行读取操作,不能进行写操作,这个就是共享式;如果这个文件被进行一个进程进行写操作,那么其他的所有的操作都会被拒绝,这个写操作就是独占式操作。
之前,我们在分析Node的成员变量时,SHARED变量就是表示当前的Node获取共享式同步状态,EXCLUSIVE表示获取独占式同步状态。
现在,我们来看看获取共享式同步状态的过程。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
我们通过调用acquireShared来获取一个共享式的同步状态,在acquireShared方法里面,调用了tryAcquireShared去尝试获取共享式的同步状态,当返回值大于等于0时,表示能够获取到同步状态;反之则不能获取同步状态。
当获取同步状态失败,会调用doAcquireShared方法,我们来在这个方法里面都在干嘛。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
跟独占式差不多,当获取共享式的同步状态失败时,会在doAcquireShared方法里面进行自旋,如果当前节点的前驱节点为头条节点时,尝试获取同步状态,如果返回值为大于等于0时,表示此次获取同步状态成功并且从自旋状态中退出。
我们也来看看释放共享式同步状态的操作。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在releaseShared方法里面首先尝试释放同步状态,如果释放成功的话,调用doReleaseShared方法对后续线程进行唤醒操作。
它与独占式的区别是,在tryReleaseShared方法里面必须保证同步状态被安全释放,一般是通过循环和CAS来保证的,因为使用同步状态的操作可能来自多个线程。
(5).独占式超时获取同步状态
通过调用doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间内获取同步状态,如果获取成功的话则返回true;反之,则返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized那里,对该线程进行中断操作,此时该线程的中断标志位会被修改,但是线程依旧会阻塞在synchronized上,等待着获取锁。在Java5中,AbstractQueuedSynchronizer提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前的线程被中断了,会立刻返回,并且抛出InterruptedException。
超时获取同步状态过程可以被视作响应中断获取同步状态过程的”增强版“,doAcquireNanos方法在支持响应中断的基础上,增加了超时获取的特性。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对于超时获取同步状态,关键在于睡眠时间的计算。为了得到及时的响应,nanosTimeOut = deadline - System.nanoTime(),如果nanosTimeOut小于等于0时,表示时间已经到了;反之则表示时间还未到,需要睡眠nanosTimeOut。
在这种情况下获取同步状态跟独占式情况下获取比较类似,只不过这里需要注意几个地方:
1.当nanosTimeout > spinForTimeoutThreshold时才进行睡眠,因为非常短的超时等待无法做到十分精确,如果此时还要进行超时等待,会让nanosTimeout从整体上表现的反而不精确。
2.如果当前的线程被中断过,就会抛出InterruptedException异常。
4.总结
AbstractQueuedSynchronizer的实现原理最重要的一部分就是同步队列,不管是在获取同步状态还是释放状态,最终还是体现在对同步队列的维护。
说实话,在这之前,我也看过AbstractQueuedSynchronizer类的源代码,但是当时是狗咬乌龟--无从下口。现在借助《Java 并发编程的艺术》对AbstractQueuedSynchronizer有了一定的了解,所以强烈推荐这本书!!!
网友评论