前言
JDK并发包是基于两个基础类 Unsafe
和 LockSupport
实现的:
- unsafe 提供基于os系统调用的内存CAS操作
- LockSupport 提供基于os系统调用的线程阻塞、唤醒
在上面两个核心类的基础上,JDK并发包基于链表实现的队列,实现了核心类 AbstractQueuedSynchronizer
, 这个同步器是并发包其他一切工具类的底层实现核心;
AQS 源码分析
由于JDK里AQS的源码风格比较像C++,易读性也比较差,我重新整理了下代码注释和结构,便于分析
package zxl.org.aqs;
/**
* AQS队列的节点实现
*
* @author zhouxiliang
* @date 2020/9/11 19:34
*/
public class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* Node 属性
*/
//最关键属性 每个Node都代表一个线程
volatile Thread thread;
//Node可以组成双向链表
volatile Node prev;
volatile Node next;
//下面两个字段只是用来表示Node状态的
/**
* 共享的node 为 shared
* 独享的node 为 null
*/
Node nextWaiter;
/**
* 参见上面的4个状态 cancel 代表取消等待 condition 代表在等待事件发生 signal 代表事件发生了
* propagate 是传播,应该是共享节点用的
*/
volatile int waitStatus;
public Node() {
}
public Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
public Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
public final boolean isShared() {
return nextWaiter == SHARED;
}
public final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
package zxl.org.aqs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
/**
* AQS 的等待队列(一个竞争队列,可以挂载N个等待队列)
*
* @author zhouxiliang
* @date 2020/9/11 19:43
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private static final long spinForTimeoutThreshold = 1000L;
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
/**
* condition 的内部属性
*/
private transient Node firstWaiter;
private transient Node lastWaiter;
//每个condition都必须附属于一个 synchronizer
private AbstractQueuedSynchronizer synchronizer;
/**
* 将源码的内部类独立提取出来 方便理解
*
* @param synchronizer
*/
public ConditionObject(AbstractQueuedSynchronizer synchronizer) {
this.synchronizer = synchronizer;
}
/**
* conditionObject 的内部核心方法
*/
/**
* 添加节点逻辑
* 添加时候会检查最后一个Node是否取消,如果取消过则触发整个链表的清理过程
*
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//检查最后一个Node 的状态,如果被重置为非condition,则清理一下整个链表
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个新的condition状态的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
//初始化
lastWaiter = firstWaiter = node;
} else {
//移动尾部节点 这里处理方式为单向链表
t.nextWaiter = node;
lastWaiter = node;
}
//返回新创建的节点
return node;
}
/**
* 将取消等待的Node从单向链表中删除
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//循环过程中用来保存前一个condition节点
Node trail = null;
//简单遍历,将非condition节点从单向链表里删除
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
/**
* 简单的从单向等待列表里 删除第一个condition节点
* 这里的循环是为了保证确定有一个node是在 condition状态, synchronizer会判断删除节点的状态,并加入到竞争队列
*
* @param first
*/
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!synchronizer.transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 同上
* 只是循环处理所有等待节点,键入synchronizer的竞争队列里
*
* @param first
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
synchronizer.transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 针对Condition接口的实现
*/
/**
* 增加了边界条件判断
* 其他逻辑参考 doSignal
*/
public final void signal() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 增加了边界条件判断
* 其他逻辑参考 doSignalAll
*/
public final void signalAll() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* 1. 加入等待队列
* 2. 通过block方式等待进入竞争队列
* 3. 通过block方式等待变为竞争队列的第一个
*/
public final void awaitUninterruptibly() {
//将当前线程加入到等到队列中
Node node = addConditionWaiter();
//根据具体锁的实现来决定是否激活竞争队列中的一个线程
int savedState = synchronizer.fullyRelease(node);
boolean interrupted = false;
//没进入竞争队列则一直等待 忽略interrupted标志
while (!synchronizer.isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
//需要当前线程的node 在竞争队列中排首位才会停止阻塞,acquireQueued 是一个不停检查和通过 LockSupport.park 等待的过程
if (synchronizer.acquireQueued(node, savedState) || interrupted)
synchronizer.selfInterrupt();
}
/**
* 基本逻辑同 awaitUninterruptibly
* <p>
* 不同之处在于处理 Thread.interupted 状态标识,如果线程被设置了这个状态位,则不再被动等待进入 竞争队列, 直接主动进入竞争队列
* 另外一个注意的点,不是设置了interupted就会抛异常,还有一种就是signal+interupted是几乎同时触发的情况,不抛异常
*
* @throws InterruptedException
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(synchronizer.transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
synchronizer.selfInterrupt();
}
/**
* 基本流程同await
* 在等待进入竞争队列的过程中,加入了时间判断,如果超过等待时间则直接进入竞争队列
*
* @param nanosTimeout
* @return
* @throws InterruptedException
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
synchronizer.transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 逻辑同上
*
* @param deadline
* @return
* @throws InterruptedException
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = synchronizer.transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 逻辑同上
* 这坨坨的冗余代码。。
*
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = synchronizer.fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!synchronizer.isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = synchronizer.transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 工具扩展
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == synchronizer;
}
/**
* 遍历整个链表
* 检查是否有等待状态的node
*
* @return
*/
protected final boolean hasWaiters() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* 遍历整个链表
* 计算等待状态的node数量
*
* @return
*/
protected final int getWaitQueueLength() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* 遍历整个链表
* 获取等待状态的node线程集合
*
* @return
*/
protected final Collection<Thread> getWaitingThreads() {
if (!synchronizer.isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
package zxl.org.aqs;
import sun.misc.Unsafe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;
/**
* AQS 的核心实现
* 代码只有几百行
* 但是能从几百行代码领悟多少精髓 全看悟性了
* 理解代码的时候,注意里面并不是顺序的逻辑,会在多线程之间跳转
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
//用来判断使用自旋或者 os mutex 单位为ns 1ms=1000us=1000,000 ns cpu指令一般在ns级别
private static final long spinForTimeoutThreshold = 1000L;
//sun 操作内存的内部工具类
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* 下面几个字段用来获取 字段在对象内存中的偏移量
*/
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
/**
* 当 node 竞争锁失败后(此时一般node处于链表的中间),是否进入 系统调用 block线程
*
* @param pred
* @param node
* @return
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前一个都在等呢 直接返回true 阻塞当前节点的线程
return true;
if (ws > 0) {
//前一个节点取消了,则清理一下链表
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前一个节点是 propagate 则设置为 signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//清理或改完状态后 返回外层调用后不要阻塞当前线程
return false;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* AQS 核心的三个内部成员
* 维护了一个双向链表的 head、tail
* 维护了一个 int型的状态标识 32位Bit
*/
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected AbstractQueuedSynchronizer() {
}
/**
* 利用操作系统底层的 CAS 指令完成
* 是一种乐观锁的实现方式
*
* @param update
* @return
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 同上
*
* @param expect
* @param update
* @return
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
/**
* 同上
*
* @param expect
* @param update
* @return
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* 在竞争队列里加一个 node
*
* @param node
* @return
*/
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) {
//初始化过程包含两步 设置head、设置tail (需要对其他线程有顺序性、可见性、原子性)
//其他线程通过判断最后赋值的 volatile 变量 tail , 来保证初始操作的原子性
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//这里挂载不是包含三个步骤,并不是原子性的
//挂载成功的标志是设置tail成功(但双向链表可能还不满足 prev.next == tail)
//所以其他线程遍历的时候,有时需要从tail 反向遍历
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 跟enq 入队类似
* 添加一个 共享或独享 节点 node
*
* @param mode
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 激活双向链表中,node后面的某一个非取消节点线程
* 如果激活的节点恰好获取到了锁,则会改变 竞争队列的head
*
* @param node
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
//激活后续节点的时候 当前节点的状态被吃掉了 signal、propagate -> 0
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//这段代码是为了兼容其他线程正在并发修改tail的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) {
LockSupport.unpark(s.thread);
}
}
/**
* 如果head节点是 signal 状态,则激活竞争队列里的一个非取消节点线程
* 如果head节点是 复位 状态, 则设置为 propagate 状态后返回
*/
private void doReleaseShared() {
for (; ; ) {
Node h = head;
//链表非空
if (h != null && h != tail) {
int ws = h.waitStatus;
//右边相邻node在park的时候 会设置head的状态为 signal
if (ws == Node.SIGNAL) {
//这里必须吃掉signal 才能激活后续节点线程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//激活右邻的node线程
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
//代码执行到这里 一般代表没有右邻节点在等待
continue;
}
}
if (h == head) {
// loop if head changed
// 如果head节点被 激活线程给吃掉了 则继续尝试激活等待线程
break;
}
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//共享锁的传播
doReleaseShared();
}
}
/**
* 取消node的时候 需要清理链表
* 后续节点要么挂到新的 pred节点,要么全部唤醒
*
* @param node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null) {
return;
}
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
return;
}
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) {
compareAndSetNext(pred, predNext, next);
}
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* node 竞争锁(竞争队列的第一位)
* 竞争成功后 会改变head
*
* @param node
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//等待被激活
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 当前线程 竞争独占锁
* 1. 先将当前线程创建一个 独占node 加入到竞争队列
* 2. 竞争队列头部
*
* @param arg
* @throws InterruptedException
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 基本逻辑同上
* <p>
* 当前线程竞争独占锁
* 加入了超时时间,超时则直接返回失败
*
* @param arg
* @param nanosTimeout
* @return
* @throws InterruptedException
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 基本逻辑同 doAcquire
* 不同之处在于 获取到 共享锁之后,调用了 setHeadAndPropagate
* setHeadAndPropagate 内部会继续releaseShared
*
* @param arg
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取共享锁 这行代码是关键 假设当前线程为A,则A会激活 线程B(node紧邻的后续node线程B)的执行点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//线程B的执行点
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 同上
*
* @param arg
* @throws InterruptedException
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 下面4个方法为主要的扩展点 用内部的 int类型 保存状态
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 当前线程是否获取了锁
*
* @return
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 内部会快速尝试 tryAcquire 如果 成功直接返回
* 否则 走正常的 竞争队列排队逻辑
*
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 同上
* 特殊处理interupted 标识,如果中断标识则抛异常
*
* @param arg
* @throws InterruptedException
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 如果 tryRelease成功了(子类扩展点),则激活竞争链表中的某一个节点线程
*
* @param arg
* @return
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//注意判断waitStatus为非零状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Queue inspection methods
public final boolean hasQueuedThreads() {
return head != tail;
}
public final boolean hasContended() {
return head != null;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// Instrumentation and monitoring methods
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
// Internal support methods for Conditions
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (; ; ) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* 更改node的状态为非condition
* 将node加入到竞争队列里
* 检查竞争队列的上一个节点是否取消过,取消过则激活对应的线程
*
* @param node
* @return
*/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node加入到 AQS里的竞争队列末尾 返回的是竞争队列里的上一个tail
Node p = enq(node);
int ws = p.waitStatus;
//检查上一个tail节点是不是 cancel了, 或者 tail节点的状态有多线程竞争,直接激活这个节点的线程
//在node状态改为signal之前 如果其他线程改变了传播状态waitStatus,这里会直接触发激活等待线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* 成功了则激活一个竞争队列的节点线程
* 失败了,node会被设置为cancel状态
*
* @param node
* @return
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Instrumentation methods for conditions
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
}
网友评论