背景和简介
AbstractQueuedSynchronizer
简称 AQS 是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
JDK1.5中提供的java.util.concurrent包中的大多数的同步器(Synchronizer)如 Lock, Semaphore , Latch , Barrier等,这些类之间大多可以互相实现,如使用 Lock 实现一个 Semaphore 或者反过来,但是它们都是基于 java.util.concurrent.locks.AbstractQueuedSynchronizer
这个类的框架实现的,理解了这个稍微复杂抽象的类再去理解其他的同步器就很轻松了。
为什么需要AQS
上面也提到,这些同步器间可以互相实现,但是它们间并没有一个比其他都更底层更抽象
下面实现一个简单的Semaphore 的例子
private static class MySemaphore {
private Lock lock = new ReentrantLock();
private Condition available = lock.newCondition();
private int permit;
public MySemaphore(int permit) {
this.permit = permit;
}
public void acquire() {
lock.lock();
try {
while (permit <= 0) {
try {
available.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
permit--;
} finally {
lock.unlock();
}
}
public void release() {
lock.lock();
try {
permit++;
if (permit > 0) {
available.signalAll();
}
} finally {
lock.unlock();
}
}
}
synchronized
关键字的监视器锁模式针对于几乎只有一个线程、无竞争的加锁的情景,在减少加锁内存开销和锁获取时间延迟放面做了许多优化,如偏向锁、轻量级锁等。
而上面的同步器在大多数情况下都是在多线程多处理器的情况下使用的,所以synchronized并不适用。相对于内存开销和获取延迟,AQS的性能目标更针对于扩张性(scalability),也就是说无论有多少线程尝试使用同步器,需要同步的开销我们希望应该都是常量级的。
state相关
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
队列queue相关
queue是一个FIFO的队列,每个节点都是下面的内部类Node类型,等待着state这个资源
// 节点node进入队列,采用CAS的方式,返回其前驱
private Node enq(final Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {// 队列不为空
U.putObject(node, Node.PREV, oldTail);// 插入节点至队列尾部
if (compareAndSetTail(oldTail, node)) {//CAS修改队尾为node,之所以CAS是因为可能有多个线程争相入队
oldTail.next = node;
return oldTail;
}
} else {// 队列为空,先初始化
initializeSyncQueue();//设置头结点
}
}
}
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
// 将当前线程以mode的方式(EXCLUSIVE或者SHARED)构成新节点并入队,返回这个新节点
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 更快的入队方式,如果失败再采用较慢的标准入队方式enq
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
// 把node设置为新的头,老的头出队
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
addWaiter 与enq 队列
AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
获取与释放资源相关
资源获取分为EXCLUSIVE和SHARED两种模式,对应acquire与release、acquireShared与releaseShared。
- EXCLUSIVE 模式获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里tryAcquire
需要继承类自己实现(成功true,失败false),如果tryAcquire
成功则直接返回,否则addWaiter
将当前线程以独占节点的方式置于同步队列尾部等待。acquireQueued
使得该节点等待获取资源,一直获取到资源才返回,整个等待过程中如果有中断是不响应的,但是获取资源后会用selfInterrupt
补上。
// acquireQueued -> shouldParkAfterFailedAcquire
// 在 acquire 失败后检查并更新node的status, 如果需要阻塞线程就返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱pred获得资源后会通知当前节点node阻塞
return true;
if (ws > 0) {
// 前驱取消了资源获取,那么当前节点就要遍历前面节点,找到最近一个正在等待的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;// 此处 pred.waitStatus < 0,亦即pred 还在等待尝试获取资源
} else {
// 前驱正在等待,则设置其状态为SIGNAL,让他获取资源后通知本节点,
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
// 但是本节点不能马上阻塞,因为设置不一定能成功,需要下次再次检查
}
return false;
}
// shouldParkAfterFailedAcquire 返回true,就阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里需要提一下LockSupport的用法,LockSupport给每个线程绑定一个类似 Semaphore 的 permit 许可数量,不过permit最大为1,初始时permit为0。初次调用 park
为0,主线程一直处于阻塞状态。因为许可默认是被占用的,调用park()
时获取不到许可,所以进入阻塞状态。所以要先释放许可unpark()
,再获取许可park()
,主线程能够正常终止。调用 park()
后会将 permit 重新置为0,所以LockSupport 是不可重入的。
- EXCLUSIVE资源释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里tryRelease
也需要继承类自己实现(成功true,失败false),如果释放成功,则调用unparkSuccessor
唤醒后继节点返回true,否则返回false
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)// 可能需要释放通知信号,把状态置零,允许失败
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;// 找到后继节点
if (s == null || s.waitStatus > 0) {// 如果后继节点为空或者已经取消
s = null;// 确保该节点的释放
for (Node t = tail; t != null && t != node; t = t.prev)// 从队尾开始找到需要通知的最近的后继节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)// 如果需唤醒的后继节点存在则唤醒之
LockSupport.unpark(s.thread);
}
- SHARED资源获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 等待获取共享资源时不响应中断,但是获取资源成功后会用selfInterrupt补上
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 入队尾
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {// 处于队列第二个位置,可以尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {// 获取成功
setHeadAndPropagate(node, r);// 将自己设为队列头,并唤醒可能获取资源的后面几个节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 同acquireQueued的分析
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);// 设置新的头
// 如果还有资源,则唤醒下一个,采用保守策略,
// 多唤醒几次即使没获取到资源也无所谓,尽量做到不漏掉资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- SHARED资源释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {// 需要唤醒
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))//设置WaitStatus失败
continue; // loop to recheck cases
unparkSuccessor(h);// 一定要设置成功才唤醒
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head)// 头变了,不需要继续唤醒
break;
}
}
内部类
- Node
等待队列的节点类,等待队列是CLH(Craig,Landin,Hagersten)锁队列的一种变体,CLH锁通常用来作为自旋锁,对于处理线程的取消和超时很方便。
static final class Node {
// 资源获取模式
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 0:初始状态;
// CANCELLED 1:被取消;
// SIGNAL -1:当前线程释放资源或取消后需要唤醒后继节点;
// CONDITION -2:条件等待;
// PROPAGATE -3:下一个acquireShared操作应该被无条件传播。
// 实际使用中,一般只关注正负,非负数就意味着节点不需要释放信号
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
// 作为同步队列节点时,nextWaiter有:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
// 与ConditionObject搭配使用作为条件等待队列节点时,nextWaiter保存后继节点。
// 所以实际上这个Node类是被复用了,既用于同步队列,也用于条件等待队列。
Node nextWaiter;
}
- ConditionObject
这个类实现了Condition接口,主要用来完成常见的条件等待、唤醒等操作。一个ConditionObject 包含一个等待队列,由firstWaiter
和lastWaiter
决定。当前线程调用Condition.await()
方法时,会被构造成为节点,然后置于条件等待队列队尾。
常用的等待方法
// 条件等待,响应中断
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();// 响应中断
Node node = addConditionWaiter();// 当前线程加入条件等待队列
int savedState = fullyRelease(node);// 释放资源,并获得本节点需要的资源数,以便再次获取
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 如果当前节点不是在同步队列,也就是还在等待队列等待着条件发生
LockSupport.park(this);// 就会一直阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//加入同步队列等待获取资源,成功后要检查中断响应
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个条件等待节点是取消的状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();// 清理整个链路的无效节点
t = lastWaiter;
}
//以条件等待的方式将当前线程封装成节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)//条件等待队列为空就初始化
firstWaiter = node;
else// 队列不空,插入队尾
t.nextWaiter = node;
lastWaiter = node;
return node;// 返回新插入的节点
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();// 本节点需要的资源数
if (release(savedState)) {// 释放掉这么多资源
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
信号方法
public final void signal() {
if (!isHeldExclusively())// 要使用该方法必须先是独占线程
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);//找到第一个条件等待节点,并发出信号
}
// 去掉条件等待队列的节点,直到遇上没取消的或者空节点
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 节点已被取消
return false;
Node p = enq(node);// 条件等待队列的第一个节点被加入同步队列的队尾
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);// 唤醒节点对应线程
return true;
}
实例分析
-
ReentrantLock
ReentrantLock
有FairSync
和NonfairSync
两个类来实现公平锁和非公平锁,我们看非公平锁,主要几个方法是-
lock()
,使得NonfairSync
调用compareAndSetState
把state从0设为1,并用setExclusiveOwnerThread
把当前线程设为独占线程(亦即首次获得锁),如果失败则使用acquire(1)
调用nonfairTryAcquire
。总体流程就是如果state为0,那么就是本线程首次获得锁,把state置为1,否则如果当前线程是独占线程则将state+1(这也是锁可重入的关键),如果都不是就进入acquireQueued
流程等待获得锁了了 -
unlock()
,调用AQS的release(1)
方法,实际上是调用了Sync的tryRelease(1)
方法,如果state-1为0,那么返回true,否则返回false。也就是说,重入锁必须释放够重入次数才算真正释放成功,但是unlock()
方法本身不会管这个最终结果,只管释放 -
tryLock()
,与lock()
区别是不等待,立即返回,只有唤醒时就是独占线程才能返回true,实现方法是nonfairTryAcquire
-
newCondition()
直接返回了了AQS的内部类ConditionObject
-
isLocked()
如果state为0则表示未加锁返回false,否则返回true
-
-
CountDownLatch
CountDownLatch 主要几个方法是-
CountDownLatch(int count)
,构造方法,设置 AQS 的 state 为 count -
await()
,调用 AQS 的acquireSharedInterruptibly(int arg)
方法,然后调用自己覆盖的tryAcquireShared(int acquires)
来获得state的值是否为0,如果是0就结束等待直接返回了,如果不是0就调用 AQS 的doAcquireSharedInterruptibly(int arg)
方法,该方法会循环等待,直到state为0才返回或者被中断。 -
countDown()
,调用 AQS 的releaseShared(int arg)
方法,实际上是调用了自己覆盖的tryReleaseShared(int releases)
方法,把 state 减了1,如果此时state为0,则调用 AQS 的doReleaseShared()
方法
-
总结
总体而言,AQS提供了一个模板方法模式,将获得锁释放锁一些必要的流程操作都规定好了,我们只需要填充一些具体的获得与释放方法
-
getState()
,setState(int newState)
,compareAndSetState(int expect,int update)
:是资源相关操作,保证原子性 -
tryAcquire(int arg)
:尝试独占获取资源。成功返回true,失败返回false。 -
tryRelease(int arg)
:尝试独占释放资源。成功返回true,失败返回false。 -
tryAcquireShared(int arg)
:尝试共享获取资源。负数表示失败,非负数表示成功代表剩余可用资源 -
tryReleaseShared(int arg)
:尝试共享释放资源。如果释放后可以唤醒后续等待结点返回true,否则返回false。 -
isHeldExclusively()
:代表当前线程是否独占资源,只有用到Condition之时才需要去实现它。
自定义同步器时,一般都是自己写一个 static class Sync extends AbstractQueuedSynchronizer
静态内部类来实现具体的方法。
网友评论