前面在写线程池时,有个Worker
的内部类,其除了实现Runnable接口还继承了一个AbstractQueuedSynchronizer
类来控制中断(这点我到现在还没想明白是为啥, 中断一个AQS的线程并不需要获得锁哇)<已有思路详见死磕源码系列-ThreadPoolExecutor 最后补充>
这里来看下AbstractQueuedSynchronizer
类的源码~
- 本篇是
AQS
之独占模式
- 节点
- 获取
- 释放
注释
AbstractQueuedSynchronizer
抽象队列同步器, 其实应该把源码上的注释都看一遍, 但英语差....
这里就简单看下第一段(误差之处, 恳请指出)
Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues. This class is designed to
be a useful basis for most kinds of synchronizers that rely on a
single atomic {@code int} value to represent state. Subclasses
must define the protected methods that change this state, and which
define what that state means in terms of this object being acquired
or released. Given these, the other methods in this class carry
out all queuing and blocking mechanics. Subclasses can maintain
other state fields, but only the atomically updated {@code int}
value manipulated using methods {@link #getState}, {@link
#setState} and {@link #compareAndSetState} is tracked with respect
to synchronization.
- 首先,这是一个框架,用于实现阻塞锁与一些使用先进先出策略(
FIFO
)阻塞队列的同步器(比如semaphores, events
) - 使用一个
int
类型的变量来表示状态(这个状态我理解是队列中节点的状态) - 子类必须定义更改此状态的受保护方法(比如
tryAcquire
),并定义该状态对于获取或释放此对象而言意味着什么。(比如在ReentrantLock
中state
的值就表示重入次数) - 子类可以维护其他状态字段,但是仅跟踪关于同步的使用方法{@link #getState},{@ link #setState}和{@link #compareAndSetState}操作的原子更新的{@code int}值。(也就是那唯一的一个状态字段
state
)
节点
既然是一个队列,那就肯定有一个描述单个元素的数据结构,在这就是Node
类,先来看下其结构
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
归纳一下
// 静态常量
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// waitStatus 属性
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 主要属性
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
- 独占模式中,
waitStatus
只用到了两种状态CANCELLED
和SIGNAL
(其实还有一种初始状态 0,表示当前只有一个头结点) -
prev
和next
表示这个队列其实是个双向链表 -
thread
代表封装内的线程,获取锁、挂起等待、唤醒与被唤醒、释放锁的执行单元都是线程 -
nextWaiter
在独占模式下永远为null
(EXCLUSIVE = null
)
下面就以ReentrantLock
中的非公平锁使用来看下AQS
锁的获取与释放.
ReentrantLock
中的公平与非公平区别就在新加入的任务是否拥有抢占锁的机会. 其他比较网上文章比较多~~~
获取
lock&acquire
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 可以看到非公平锁上来直接
CAS
操作试图获取锁, 这是第一次 - 失败进入
acquire
方法, 这是AQS
内部的final
方法, 禁止重写哦 -
acquire
方法中有三个方法, 我们一一来看下
tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
-
AQS
中并没有抽象方法, 所有子类需要用到且又需要子类重写的方法都定义为了protected
, 其内部直接抛出一个异常. 如果你要用, 你就要自己重写, 可见此方法是需要子类重写的. 看下ReentrantLock
中的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 判断当前是否有锁,没有直接尝试加锁(第二次不公平操作)
- 否则判断是否当前线程持有锁,是则重入,否即返回
addWaiter
addWaiter(Node.EXCLUSIVE)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
既然获取锁失败,那么当然要封装成一个Node
进行队列中等待
- 独占模式下,
Node
的构造方法中其nextWaiter
永远是null
- 然后判断是否队空,若不为空则进行
CAS
入队操作 - 此方法返回的是代表当前线程的节点
enq
执行到这里有两种可能:1、队空;2、CAS
失败
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 自旋+
CAS
入队 - 队空时创建了一个空节点作为
head
, 这是CLH 队列
的变种, 头部为一个哑结点(dummy node
) - 队不空的话就继续
CAS
操作, 直到所有节点全部入队 - 此方法返回的是当前节点的前驱节点,所以此时有可能已经是实际上的第一个节点了,满足
node.prev == head
再次进行抢锁
这里有个很好玩的现象,称为尾分叉
,由于此时是没有获取到锁的,所以此时的代码执行都是非线程安全的,而入队分为三步:
- 将
node
的prev
指向尾节点 -
CAS
更新尾节点 - 将之前尾节点的
next
属性指向node
假设现在有两个线程T1
/T2
同时执行入队操作, 都完成了第一步, 然后第二步中T1
成功(N1
成为新的tail
), T2
失败, 然后此时线程的时间片用完被切走, 那么此时的队列就是这样的:
- 队列尾节点为
N1
, 其prev
为t
节点, 其next
为null
-
t
节点为队列倒数第二个节点, 其next
为null
-
N2
还没有入队, 其prev
为t
节点
就有两个节点的prev
指向了t
节点,也就是分叉
了。
而如果此时有线程从头节点开始遍历队列,那么刚入队的N1
是不是就访问不到了,因为t
节点的next
为null
(还没有执行第三步)
而只要一个节点入队成功,那么其prev
属性是一定有值(执行前一个尾节点,此处就是t
节点)
所以
AQS
中很多遍历队列的操作都是从尾部开始遍历,就是为了在这种情况下也能访问全部的入队节点
-
enq
方法返回的是当前节点的前驱 -
addWaiter
方法返回的是当前尾节点
acquireQueued
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 从开始的获取锁失败到这一步已经经过了漫长岁月, 再判断一下前置节点是否为
head
节点, 如果是则再次尝试获取锁 - 获取成功了就执行
if
块内逻辑
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- 替换
header
节点, 当前执行线程保存在exclusiveOwnerThread
中
否则需要判断是否需要挂起当前Node
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这个方法的字面意思是抢锁失败后是否需要挂起。其实可以换种说法,挂起之后如何保证在合适的时机被唤醒?难道挂起之后从此不问世事?让我们明确一个事实,当前为独占模式,只有一个线程代表的节点能获得执行,那么此节点与下一个将要执行的节点之间是不是可以加上点联系呢?
AQS
通过waitStatus
字段来添加此联系。
Node.SIGNAL 的注释是这样的
waitStatus value to indicate successor's thread needs unparking
- 表明后继节点需要被唤醒.
那么问题就简单了,我只需要保证在我挂起之前将我前驱节点的waitStatus
设置为Node.SIGNAL状态就行了。
这里还有一个问题,如果当前节点的前驱节点被超时或被中断了咋办?也不难,那就往前找一个可以设置为Node.SIGNAL的节点(能执行能释放)作为我的前驱节点就是了。
继续看代码。
- 首先,如果前驱直接就是Node.SIGNAL,那么返回
true
- 如果
waitStatus
大于0
,说明此节点已经被取消了,不参与锁的抢占与任务执行了,那怎么能当我的前驱呢?换 ! 怎么换?从后往前遍历找到第一个不大于0
的,也就是0
(header
节点初始化时候的状态)或Node.SIGNAL(-1
), 这二者都可以被设置为Node.SIGNAL或已经是Node.SIGNAL - 状态为
0
时直接CAS
修改为Node.SIGNAL - 这里之所以往前找有两个原因: (1)拿到的就是尾节点 (2)防止尾分叉情况下漏节点
- 注意看这段代码要配合着
acquireQueued
中的for(;;)
一起看,是需要多次重试 - 只有前驱节点设置为了Node.SIGNAL才能返回
true
,才能进入下一个方法parkAndCheckInterrupt
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- 灰常简单,挂起当前线程
注意,执行到这一步,线程就停了(
WAITING
状态),下面都是被唤醒或被中断后继续执行的操作。
- 返回中断标识,注意这里的
interrupted
方法会重置中断标识位
注意这里返回的中断标识是给哪边用的,我们一层层返回会发现到这一层:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
返回true会执行selfInterrupt
方法
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- 就简单的中断一下
说明线程在等待入队或入队过程是不会响应中断的,而是直到入队后才来检查一下是否曾经被中断过,有就再自我中断一下,即传说中的延迟中断
acquireQueued
方法还有一个问题,有个finally
块
if (failed)
cancelAcquire(node);
正常情况下,前面for(;;)
退出的唯一情况是
failed = false;
return interrupted;
也就是failed
永远为false
,那么cancelAcquire
方法永远无法执行,而如果执行过程中又不响应中断,难道我哪里想岔了?
释放
独占锁的释放,在ReentrantLock
中的公平锁与非公平是一样的
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 调用
tryRelease
释放锁,只有完全释放才返回true
(重入多少次就要释放多少次)
tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 满足条件后调用
unparkSuccessor
h != null && h.waitStatus != 0
-
h
为null
表明队列为空, 没有任务在等待, 也就不需要唤醒 -
h
的waitStatus
为0
表明没有后续节点在等待(如果有, 这个waitStatus
就是Node.SIGNAL) - 注意这里往下传入的是头节点
所以唤醒后继线程的条件是还有后继节点在等待抢占锁,即等待唤醒。
unparkSuccessor
唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 如果当前节点为Node.SIGNAL, 先
CAS
置为0
- 如果直接后继节点已经取消(
ws > 0
),那么从队列尾部往前遍历查找一个最靠近当前节点并且没有取消的节点,唤醒它。
尾遍历时并不是直接唤醒找到的第一个可以唤醒的节点,而是最靠近当前节点且可以唤醒的节点,这就体现出了先进先出(
FIFO
)
总结
- 获取锁
- AQS 自身维护一个同步队列(sync queue),所有未能获得锁的线程都会被放入此队列中
- 独占模式允许同一个线程多次获取锁,此时的 state 表示重入次数;共享模式允许多个线程同时获取锁,此时的 state 表示获取锁的线程个数(一定会记录一个线程数量,但不一定只记录线程数量,比如 ReentrantReadWriteLock 的 ReadLock)
- 是否公平取决于新入线程是否有直接抢占锁的机会,所谓公平是需要判断当前队列是否存在未获得锁的正常线程(未超时 & 未取消)
- addWaiter 方法返回的是当前入队节点,返回时可能已经不是尾节点了(其他等待线程入队成功);enq 方法返回的是当前入队节点的前驱节点,上一任尾节点
- 入队成功后、挂起前还有一次机会抢占锁,前提是其已经为当前队列实际上的第一个节点。这里无关公平而是同步队列的先进先出,公平只体现在新入线程身上,入队以后就是真正的公平了
- 释放锁
- 同步队列真正的头节点是个空节点,初始化时并未对 waitStatus 赋值,其 waitStatus 值为 int 类型的初值 0,而等待线程新获得锁后升级为头结点时同样没有处理 waitStatus,所以为等待线程的 waitStatus 原值。所以在释放锁时是否唤醒后继节点的判断中需要看下头结点的 waitStatus 是否为 0 来判断同步队列是否为只有一个头结点的空队列
- 先处理状态 state,如果是 Node.SIGNAL 则 CAS 为 0,这步操作的结果其实无关紧要但是一定要有。
2.1 先说为什么结果无关紧要,因为唤醒后继节点后新的线程获得锁后会自动升级为头结点,其 waitStatus 一起替换了,而代码实现确实没有保证此次 CAS 一定要成功(It is OK if this fails or if status is changed by waiting thread.
)。至于为什么会失败?独占模式下不会失败,因为释放的前提就是拿到了锁,而独占模式只有一个线程能获得锁。推测应该是共享模式下,多个获取锁的线程进行释放锁造成头节点的 waitStatus CAS 失败,不过这也不影响就是了
2.2 再说为什么一定要有。如果后继节点全部取消,如果没有这步CAS,那么当前同步队列就只有一个头结点的空队列,而这个头结点的 waitStatus 为 Node.SIGNAL,很明显不合逻辑,一个没有后继节点需要唤醒的头结点其 waitStatus 应该是 0 - 再尝试直接唤醒头结点的直接后继节点,如果已经取消了则尾遍历找到最靠近头结点且未取消的那个节点,加以唤醒
- 然后重新争抢锁,是否成功又要看是否为公平锁了(公平锁肯定成功,非公平锁有机会成功)
又是一个十点多,继续加油~~~
网友评论