[TOC]
概述
在多线程程序中,往往存在对共享资源的竞争访问,为了对共享资源进行保护,需要使用一些同步工具,比如 synchronized、ReentrantLock、Semaphore、ReentrantReadWriteLock等,后三种都是在同步框架 AQS(即 AbstractQueuedSynchronizer)的基础上构建的。
下面这段话来自 AQS 的代码文档,描述了其设计意图:提供一个框架用于实现依赖先进先出 FIFO 等待队列的阻塞锁和相关同步器。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
下面我们重点分析一下同步框架 AQS 的实现原理和用法。
实现
一个同步器有两个基本功能:获取同步器、释放同步器。AQS 提供了多种锁模式:
- 独占、共享
- 可中断、不可中断
- 带超时时间的
设计模式
AQS 采用了标准的模版方法设计模式,对外提供的是以下的方法:
// 独占模式
public final void acquire(int arg);
public final boolean release(int arg);
// 独占可中断
public final void acquireInterruptibly(int arg)
throws InterruptedException;
// 独占带超时时间的
public final boolean tryAcquireNanos(int arg, long nanosTimeout);
// 共享模式
public final void acquireShared(int arg);
public final boolean releaseShared(int arg);
// 共享可中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException;
// 共享带超时时间
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
这些方法上都带了 final 关键字,也就是说不允许重写,那么哪些方法可以重写呢?
//独占模式
protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg);
//共享模式
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);
//是否是独占模式
protected boolean isHeldExclusively();
这些模版方法在代码中起什么作用呢?请看他们的调用方,在 acquire 方法中,当 tryAcquire 返回 true 则表示已经获得了锁,否则先 addWaiter 进入等待队列,再 acquireQueued 等候获取锁。acquireInterruptibly 也是类似的,区别只是对中断的处理不同。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
以 acquire 为例,真正的排队阻塞等待锁是在 addWaiter 和 acquireQueued 中,那么 tryAcquire 方法需要做什么事儿呢?我们先看一下 ReentrantLock 中公平锁 FairSync 和非公平锁 NonfairSync 的实现:
// 公平锁
static final class FairSync extends Sync {
// omit a lot
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 非公平锁
static final class NonfairSync extends Sync {
// omit a lot
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// omit a lot
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁的 tryAcquire 方法所做事情大致如下:
- 检查了锁的状态,如果锁不被任何线程占用,且没有等待中的线程,则尝试用 CAS 设置状态,成功则 setExclusiveOwnerThread 设置占用锁的线程为当前线程。
- 如果当前线程已经占有了该锁,则 setState 更新更新重入次数。
- 上述两条都失败,则表示无法立即获得锁,返回 false。
非公平锁的 tryAcquire 也大致一样,只是缺少判断是否有等待中的线程,而是尽最大努力去获取锁。他们的共同点是更新锁状态 state,设置当前占用锁的线程 setExclusiveOwnerThread,其中最关键的是更新锁状态 state。
值得注意的是,为何 tryAcquire 这些方法不是抽象方法,而是提供了一个默认的抛异常的方法呢?因为 AQS 中包含多种模式,而实际使用者一般只需要一种,如果不提供默认拒绝的实现,那就需要使用方去手动覆盖,反而显得啰嗦了。
// in AQS
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
CHL 队列
在 AQS 中,提供了一个基于 CHL 队列的先进先出的阻塞锁。CHL 队列的入队出队操作是不需要加锁的,只是在入队成功后等待资源释放时会阻塞。在Java锁的种类以及辨析(二):自旋锁的其他种类里也提到了一种基于自旋的 CHL 锁。
CHL 队列本质上就是一个基于 CAS 实现的双向链表,其实也算“无锁队列”了,其节点 Node 如下所示:
static final class Node {
volatile int waitStatus; // 节点状态
volatile Node prev; // 前置节点
volatile Node next; // 后置节点
volatile Thread thread; // 所属线程
Node nextWaiter; // 可以用于标记独占 OR 共享模式,也可以用来记录等待条件变量的下一个节点
}
当 nextWaiter 用于标记独占或共享时,其值可以为:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
waitStatus 的值可以有以下几种:
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
入队
CHL 队列的入队操作间 addWaiter 方法,入队采用 CAS 自旋重试,总是添加到尾节点:
// 尾节点
private transient volatile Node head;
// 头节点
private transient volatile Node tail;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
出队
如果当前节点的前任节点是头节点,则尝试获得锁,成功则出队,并且返回。下面代码的第 8-11 行为出队操作,所做事情为:把本节点 node 置为头节点,并且把新旧头节点直接的关联字段置空(新节点 prev 和旧节点 next)。
注:旧头节点置空是为了什么呢?已经无用的 Node 能够被 GC 回收掉。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
AQS 之 ConditionObject
在 AQS 中,还有一个 ConditionObject 类,顾名思义是一个条件变量,提供了类似于 Object wait/notify 的机制。
ConditionObject 实现了 Condition 接口,提供了多种条件变量等待接口:可中断、不可中断、超时机制,以及两种唤醒机制:单个、全部。
public interface Condition {
// 在条件变量上等待
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒
void signal();
void signalAll();
}
await
await 方法类似于 Object#wait 方法,调用之前需要先获得锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到条件队列里
Node node = addConditionWaiter();
// 是否锁资源,也就是说调用 await 之前要先获得锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 循环阻塞等待,直到被中断,或进入 AQS 锁的同步队列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// signal 之后,进入同步队列,再通过 acquireQueued 竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal
signal 方法类似于 Object#notify 方法,主要功能是唤醒下一个等待中的线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒一个节点
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
// 唤醒该节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
用法
ConditionObject 常用在各种队列的实现中,比如 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue等,这里我们以 ArrayBlockingQueue 为例学习一下其用法。
请看下面 ArrayBlockingQueue 的代码里锁和条件变量的用法,条件变量是跟锁绑定的,这里一个锁对应多个条件变量:队列非空、队列非满。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
在向队列中存放元素时:
- 获得锁
- 如果队列满,则调用 notFull.await() 等待
- 队列不满时,则插入数据,并且向 notEmpty 条件变量发 signal 信号
- 解锁
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
在从队列里取元素时:
- 加锁
- 如果队列空,则调用 notEmpty.await() 等待
- 队列非空,则取数据,并且向 notFull 条件变量发送 signal 信号
- 解锁
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
总结
AQS 框架提供了先进先出的阻塞锁实现,在此基础上,提供了独占和共享等多种模式供使用方实现。除此之外,还提供了一个条件变量的实现。
锁是一种线程同步机制,用于保护对临界资源的访问。条件变量提供了一个“等待 - 唤醒”的机制,在阻塞队列里起到了生产者和消费者之间的通信的作用。
网友评论