AQS(同步器)是用来构建锁和其他同步组件的基础框架。它的实现主要是依赖一个int成员变量来标识同步状态和一个同步队列。同步器本身没有实现任何同步接口,仅仅是定义了几个protected修饰同步状态的获取和释放的方法来供同步组件使用。(状态的更新使用getState,setState以及compareAndSetState这三个方法。)
比如说锁:在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者的,它定义了使用者和锁的接口,但是隐藏了具体的实现细节。而同步器是面向锁的实现着,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队、等待和唤醒等操作。
AQS使用模板方法设计模式,它将一些方法开放给子类去进行重写,而同步器给同步组件提供的模板方法又会重新调用子类重写的方法。例如:tryAcquire()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock中NonfairSync(继承AQS)会重写该方法为:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
而AQS中的模板方法acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS提供的模板方法分为三类:
- 独占式获取与释放同步状态;
- 共享式获取与释放同步状态;
- 查询同步队列中等待线程情况;
AQS本身内部定义一个静态类Node,通过双向链式结构构成了同步队列。Node维护了几个属性:
volatile int waitStatus //节点状态
volatile Node prev //当前节点/线程的前驱节点
volatile Node next; //当前节点/线程的后继节点
volatile Thread thread;//加入同步队列的线程引用
Node nextWaiter;//等待队列中的下一个节点
节点的状态:
int CANCELLED = 1//节点从同步队列中取消
int SIGNAL = -1//后继节点的线程处于等待状态,
//如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;
int CONDITION = -2//当前节点进入等待队列中
int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
int INITIAL = 0;//初始状态
AQS重要成员变量:
private transient volatile Node head;
private transient volatile Node tail;
AQS实际上是通过头尾指针来管理控制同步队列,同时实现对获取锁失败的线程进行入队操作,释放锁是完成对同步队列中等待的线程进行通知等核心操作。
独占锁获取锁的分析
先调用acquire()方法,看是否获取同步状态(也就是是否加锁成功),如果成功直接返回,如果失败则在调用addWatier(),然后在调用acquireQueued()方法。
public final void acquire(int arg) {
//先看同步状态是否获取成功,如果成功则方法结束返回
//若失败则先调用addWaiter()方法再调用acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter()源码如下:
private Node addWaiter(Node mode) {
// 1. 将当前线程构建成Node类型
Node node = new Node(Thread.currentThread(), mode);
// 2. 当前尾节点是否为null?
Node pred = tail;
if (pred != null) {
// 2.2 将当前节点尾插入的方式插入同步队列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1. 当前同步队列尾节点为null,
//说明当前线程是第一个加入同步队列进行等待的线程
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1. 构造头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2. 尾插入,CAS操作失败自旋尝试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter()、enq()分析如下:
- 当前同步对列的尾结点为null,调用enq()方法插入当前节点
- 当前对列的尾结点不为null,则采用compareAndSetTail()把当前节点插入同步队列的尾部,如果则采用compareAndSetTail失败则继续执行enq()方法,里面会继续采用自旋模式继续插入,直至成功为止。
当前节点(线程)已经插入同步队列,acquireQueued()方法保证同步队列的节点获取独占锁(排队获取锁的过程)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1. 获得当前节点的先驱节点
final Node p = node.predecessor();
// 2. 当前节点能否获取独占式锁
// 2.1 如果当前节点的先驱节点是头结点
//并且成功获取同步状态,即可以获得独占式锁
if (p == head && tryAcquire(arg)) {
//队列头指针用指向当前节点
setHead(node);
//释放前驱节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 获取锁失败,线程进入等待状态等待获取独占式锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
分析入下:
- 首先获取当前节点的前驱节点,如果先去节点是头结点并且已经成功获取同步状态(成功获取锁),当前节点也能获取锁,反之进入等待状态。
- 如果当前节点前驱几点已经获取锁,然后通过setHead()将当前节点设置为头结点,前驱节点出对,与同步队列断开,方便回收。
- 如果当前节点的前驱节点没有获取同步状态,shouldParkAfterFailedAcquire()方法,通过compareAndSetWaitStatus设置当前节点为signall也就是等待状态。
- parkAndCheckInterrupt()调用此方法通过LockSupport.part()将当前线程阻塞。

独占锁释放分析
release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor(h)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//头节点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//后继节点不为null时唤醒该线程
LockSupport.unpark(s.thread);
}
分析如下:
- compareAndSetWaitStatus()通过cas操作更改同步对列的同步状态。
- 获取当前节点的后继节点,如果后继节点不为空,则调用LockSupport.unpark()唤醒后继节点包装的线程。
总结:
- 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()和enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;
- 线程获取锁是一个自旋的过程,当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞;
- 释放锁的时候会唤醒后继节点;
总体来讲:
在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。
网友评论