AQS其实就是java.util.concurrent.locks.AbstractQueuedSynchronizer这个类
AQS框架都是基于父类AbstractQueuedSynchronizer实现的,
- 它的实现类包括ReentrantLock(独占锁),
- ReentrantReadAndWriteLock(独占锁与共享锁),
- Condition(看上面表的对比就知道作用类似于Object的wait和notify),
- 还有CountDownLatch类,
- 以及java的线程池ThreadPoolExecuter的内部类Worker;
AQS简核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:
- 线程阻塞队列的维护
- 线程阻塞和唤醒
共享变量的修改都是通过Unsafe类提供的CAS操作完成的。AbstractQueuedSynchronizer类的主要方法是acquire和release,典型的模板方法,
下面这4个方法由子类去实现:
//尝试获取独占锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected boolean tryAcquire(int arg)
//尝试释放独占锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryRelease(int arg)
//尝试获取共享锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected int tryAcquireShared(int arg)
//尝试释放共享锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryReleaseShared(int arg)
2.类的内部类
AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍。
1.Node类
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造函数
Node() { // Used to establish initial head or SHARED marker
}
// 构造函数
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造函数
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
从Node结构prev和next节点可以看出它是一个双向链表,waitStatus存储了当前线程的状态信息
waitStatus
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
值为0,表示当前节点在sync队列中,等待着获取锁。
- ConditionObject类
3.类的核心方法
下面我们通过以下五个方面来介绍AQS是怎么实现的锁的获取和释放的
- 独占式获得锁
- 独占式释放锁
- 共享式获得锁
- 共享式释放锁
- 独占超时获得锁
- acquire函数
该函数以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
<img src="http://ovg6pv08q.bkt.clouddn.com/%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202017-12-13%20%E4%B8%8B%E5%8D%884.05.44.png" width = "300" height = "290" alt="方法流程图" align=center />
首先执行tryAcquire方法,尝试获得锁。
如果获取失败则进入addWaiter方法,
构造同步节点(独占式Node.EXCLUSIVE),
将该节点添加到同步队列尾部,并返回此节点,
进入acquireQueued方法。
acquireQueued方法,这个新节点死是循环的方式获取同步状态,如果获取不到则阻塞节点中的线程,阻塞后的节点等待前驱节点来唤醒或阻塞线程被中断。
addWaiter方法代码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//尾节点
Node pred = tail;
如果尾节点不为空 将node 节点设置成为尾节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果更新失败(存在并发竞争更新),则进入enq方法进行添加
enq(node);
return node;
}
//enq方法代码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//如果队列为空,则通过CAS把当前Node设置成头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//如果队列不为空,则向队列尾部添加Node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 该方法使用CAS自旋的方式来保证向队列中添加Node(同步节点简写Node)
// 如果队列为空,则把当前Node设置成头节点
// 如果队列不为空,则向队列尾部添加Node
acquireQueued 代码:
- 通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。
- 进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
- 在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找到当前节点的前驱节点
final Node p = node.predecessor();
9 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//如果p节点是头节点且tryAcquire方法返回true。那么将当前节点设置为头节点。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;//返回等待过程中是否被中断过
}
//意味着 不是 老二,自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//设置被中断过
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常 等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
acquireQueued()方法总结
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
acquire() 方法总结
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
release(int)
acquire()的反操作 release(),此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
public final boolean release(int arg) {
// tryReease由子类实现,通过设置state值来达到同步的效果。
if (tryRelease(arg)) {
Node h = head;
// waitStatus为0说明是初始化的空队列
if (h != null && h.waitStatus != 0)
// 唤醒后续的结点
unparkSuccessor(h);
return true;
}
return false;
}
// 此方法用于唤醒等待队列中下一个线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
总结:用unpark()唤醒等待队列中最前边的那个未放弃线程
acquireShared(int)
doAcquireShared(int) -> 此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入队列尾部
boolean failed = true;//是否成功标志
try {
boolean interrupted = false;//等待过程中是否被中断过的标志
for (;;) {
final Node p = node.predecessor();//前节点
if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//尝试获取资源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
p.next = null; // help GC
if (interrupted)//如果等待过程中被打断过,此时将中断补上。
selfInterrupt();
failed = false;
return;
}
}
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:
负值代表获取失败;0代表获取成功,但没有剩余资源;
正数表示获取成功,还有剩余资源,其他线程还可以去获取。
所以这里acquireShared()的流程就是:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!
releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放资源
doReleaseShared();//唤醒后继结点
return true;
}
return false;
}
此方法 --> 释放掉资源后,唤醒后继。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒后继
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // head发生变化
break;
}
}
实例:
Mutex(互斥锁)
public class Mutex implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -4387327721959839431L;
// 是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1;// 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}
// 释放锁,将状态设置为0
// 尝试释放资源,立即返回。成功则为true,否则false。
@Override
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
网友评论