AQS
AbstractQueuedSynchronizer 提供了一个基于FIFO队列用于构建锁或其他相关同步装置的基础框架
继承关系
继承自AbstractOwnableSynchronizer
成员变量 | 序列号 | serialVersionUID |
线程(独占模式) | exclusiveOwnerThread | |
函数 | 获取独占线程 | getExclusiveOwnerThread() |
设置独占线程 | setExclusiveOwnerThread(Thread) |
数据结构
Sync双向队列 Condition单向队列Node
Node 每个被阻塞的线程都会被封装成一个Node结点,Node结点有前后Node的引用以及结点状态和结点对应线程 结点获取资源方式有共享式和独占式
节点状态 | 含义 | 值 |
---|---|---|
CANCELLED | 表示此节点代表的线程已经被取消 | 1 |
默认状态,无含义 | 0 | |
SIGNAL | 表示此节点的下一个紧邻节点应该被unpark | -1 |
CONDITION | 表示此节点的线程正在等待条件 (Condition单向队列的节点只有Condition和Cancelled两种情况) | -2 |
PROPAGATE | 表示下一个aquireShared应该无条件传播 | -3 |
成员变量
含义 | 变量 |
---|---|
队列首 | head |
队列尾 | tail |
资源 | state |
自旋时间(默认1000) | spinForTimeoutThreshold |
Unsafe实例 | unsafe |
内存偏移地址,用于CAS | stateOffset headOffset tailOffset waitStatusOffset nextOffset |
成员函数
公共API
作用 | 接口 | |
---|---|---|
资源 | 查询资源数 | getState |
独占资源 | 获取资源(独占,无视中断) | acquire(int 获取资源数) |
获取资源(独占,可中断) | acquireInterruptibly | |
获取资源(独占,可中断,限时) | tryAcquireNanos | |
释放资源(独占) | release | |
共享资源 | 获取资源(共享,无视中断) | acquireShared |
获取资源(共享,可中断) | acquireSharedInterruptibly | |
获取资源(共享,可中断,限时) | tryAcquireSharedNanos | |
释放资源(共享) | releaseShared | |
队列线程 | 队列是否为空 | hasQueuedThreads |
是否有线程在获取资源 | hasContended | |
获取队列中第一个非head线程 | getFirstQueuedThread | |
线程是否在队列中 | isQueued(Thread) | |
判断是否有其他线程排在当前线程前 | hasQueuedPredecessors | |
获取队列长度 | getQueueLength | |
获取队列中的线程集合 | getQueuedThreads | |
获取队列中的独占线程集合 | getExclusiveQueuedThreads | |
获取队列中的共享线程集合 | getSharedQueuedThreads | |
条件 | 并发器是否持有指定条件 | owns(ConditionObject) |
并发器中指定条件的队列是否为空 | hasWaiters(ConditionObject) | |
并发器中指定条件的队列长度 | getWaitQueueLength(ConditionObject) | |
并发起指定条件的队列中所有线程的集合 | getWaitingThreads(ConditionObject) |
开放给子类的API
作用 | 接口 |
---|---|
获取资源 | getState() |
设置资源 | setState() |
CAS设置资源 | compareAndSetState() |
子类需实现接口
作用 | 接口 |
---|---|
该线程是否正在独占资源。只有用到condition才需要去实现它。 | More ActionsisHeldExclusively() |
尝试获取资源(独占方式),返回是否成功 | tryAcquire(int) |
尝试释放资源(独占方式),返回是否成功 | tryRelease(int) |
尝试获取资源(独占方式),返回负数 | tryAcquireShared(int) |
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 | tryReleaseShared(int) |
ConditionObject
为AQS增加了控制线程挂起与放下的接口,实现Condition接口
Condition接口:从Object抽取出与同步相关的方法组成的接口
条件等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//节点入列,当前线程封装为Condition型节点作为尾节点的nextWaiter插入队列(队列为空则作为头节点,尾节点Canceled则向上兼并所有Cancelled节点)
Node node = addConditionWaiter();
//释放当前资源数的资源并保存在临时变量中
int savedState = fullyRelease(node);
int interruptMode = 0;
//节点不在Sync队列时——挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//入列获取资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
唤醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
//将节点状态更换为默认状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//入列
Node p = enq(node);
int ws = p.waitStatus;
//将节点前置节点设置为Signal节点,放下节点对应线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
源码分析
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先插队尝试获取,失败则以独占模式入队并排队获取,排队获取失败的话中断线程
acquire流程图以独占模式入队——addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter效果
排队获取资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//p是等候队列第一位——尝试获取资源
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//未获取到资源,酌情考虑挂起线程或再次尝试获取
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//出现异常亦未获取到资源,取消获取
if (failed)
cancelAcquire(node);
}
}
acquireQueued流程图
尝试获取资源失败时判断是否应该挂起——shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前一个节点状态为SINGAL——它释放资源会唤醒下一位,可以安心挂起
if (ws == Node.SIGNAL)
return true;
//前一个节点状态CANCELLED——已被取消,向前兼并所有CANCELLED节点,不可挂起
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
//前一个节点不是CANCELLED,设置为SINGAL,暂时不可挂起
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
取消获取资源——cancelAcquire
private void cancelAcquire(Node node) {
if (node == null)
return;
//释放线程
node.thread = null;
// 兼并节点前所有CANCELLED节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//直接修改状态为CANNELED——无需CAS是因为取消获取资源优先级较高
node.waitStatus = Node.CANCELLED;
// 如果此节点是尾部节点直接移除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//要取消获取的节点不是等待中的第一个节点/前一个节点是SINGAL节点/前一个节点CAS为SAINGAL节点成功
//直接删除此节点,在前一个节点后拼解后续节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//取消获取的节点是等待中的第一个节点/前一个节点不是SINGAL节点
//放下后续线程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Release
public final boolean release(int arg) {
//尝试释放资源,成功则放下后续节点并置换head
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
独占式获取释放资源只在释放时放下线程,因而通常情况下只有head持有资源而后续节点的线程统统被挂起
AcquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//将当前线程封装成Shared型节点入队
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//排在队首(前一位就是head)——尝试获取
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//第一位获取成功——设置为head并doReleaseShared
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//未成功获取——决定是否挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式获取资源在成功获取资源时会调用doReleaseShared放下后续被挂起的线程,只要资源充分,可同时有多个线程持有
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//head是Signal节点——设置为默认节点并释放后续节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
//head是默认节点——设置为Propagate节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
网友评论