简介
AQS(AbstractQueuedSynchronizer) 是通过FIFO队列实现的一套框架,用于阻塞锁以及同步器比如semaphores、countdownlatch、lock的实现均依赖它。它通过一个原子的int值实现的同步,子类可以通过保护方法改变该int变量的值。
它提供了两种模式,分别为独占模式和共享模式,独占模式只允许被一个线程获取到而共享模式则允许多个线程获取。本文主要介绍的几个方法就是这两种模式的体现。
独占模式
实现方式:一个线程获取到锁并释放后,后续线程才能获取到锁,在队列中表示为当头结点将锁释放后会将后继结点唤醒
- void acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
# tryAcquire 为具体实现,就是对state变量的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
#前置节点
final Node p = node.predecessor();
#前置节点为头节点并且资源可以获取到
if (p == head && tryAcquire(arg)) {
#将当前节点设置为头节点,前置节点不再维护了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
#shouldParkAfterFailedAcquire作用就是将前置节点状态置为SIGNAL,下面详细分析,parkAndCheckInterrupt是将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
#节点状态为SIGNAL时返回
return true;
if (ws > 0) {
# 将取消的节点跳过
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
#前置节点状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
举例说明,当有两个线程A、B调用tryAcquire方法时,A、B按照顺序依次入队新来的始终排在队尾,只要A还没有获取到,那么A肯定是阻塞的,它的后继节点也肯定是阻塞的
画个图表示下:
image.png
- boolean release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
#tryRelease为具体实现,也是对state变量的操作
它的实现也是比较简单就是当资源拿到时,直接将链表的头部节点的后继节点唤醒,唤醒代码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
#头节点状态置为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
当后继节点被唤醒后,那么该节点对应的线程会继续执行acquireQueued方法中的for循环,这时当拿到资源后,那么就会将该节点设置为头节点,返回成功了
image.png
共享模式
实现方式:一个线程获取到锁后,会通知后继结点,后继结点获取到锁后继续向后传播,这样就能使得多个线程获取到同一个锁
- void acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
# tryAcquire 为具体实现,就是对state变量的操作
#与acquireQueued的不同之处在于,该方法加入的节点类型为SHARED,另外就是在获取到资源>0时,该方法会将做传播操作
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
#r>0当前节点置为头节点,并传播,下面细说
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
#当前节点设置为头节点
setHead(node);
#当获取资源数大于0 或者头部不存在或者之前的以及当前的头节点状态小于0,则当前节点的后继节点需要被唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
#后继节点为共享类型
if (s == null || s.isShared())
doReleaseShared();
}
#作用就是将头节点状态从SIGNAL置为0,并唤醒后继节点,
#再将头节点状态置为PROPAGATE,这样当后继节点被唤醒后,
#它获取到资源后把自己置为头结点,继续重复之前动作唤醒后继节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- boolean releaseShared(int arg)
上边已经介绍
过程如下图所示
初始阶段有3个线程都来调用acquireShared时,状态如下
image.png
当A获取到资源时,则
image.png
网友评论