1 数据结构
- 阻塞的线程存在哪里?
- AQS内部有一个Node类的FIFO双向队列,AQS依赖它同步状态。
- 假如当前线程获取同步状态失败,AQS会将当前线程等待等信息构造成一个节点(Node)将其加入到同步队列,同时阻塞当前线程
- 当同步状态释放,会把首节点唤醒(公平锁),使其再次获取同步状态。
- AQS内部有一个Node类的FIFO双向队列,AQS依赖它同步状态。
static final class Node {
/** 标记节点是共享模式下等待 */
static final Node SHARED = new Node();
/** 标记节点是在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; */
static final int CANCELLED = 1;
/** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
static final int SIGNAL = -1;
/** 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/**
*等待状态
*/
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
/**
* 存储condition队列中的后继节点;
*/
Node nextWaiter;
/**
* 共享模式返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前继节点, 前驱节点不为空的时候使用
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 用于建立初始标头或SHARED标记
}
Node(Thread thread, Node mode) { // 给addWaiter 使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 给Condition 使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2 AQS 独占用锁实现
2.1 acquire方法
- 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 根据返回 中断当前线程
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
-
主要工作
- 尝试获取锁
- 获取失败,执行acquireQueued
-
ReentrantLock.FairSync tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors 有别的线程在队列中排了当前线程之前
// cas设置状态为acquires,即lock方法中写死的1
// 成功则 设置当前线程独占锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是不是重入,当前线程已经持有锁, 不需要cas
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- addWaiter 根据给的的共享,或者独占模式 创建节点,并加入到队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- acquireQueued方法 循环的尝试获取锁,直到成功为止,最后返回中断标志位
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
//如果前继节点是head,则尝试获取
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p不是head或者获取锁失败,判断是否需要进行park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- shouldParkAfterFailedAcquire 获取资源失败,什么时候park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前一个节点的状态是SIGNAL,则需要park
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// waitStatus == 1 因为超时或者中断,节点会被设置为取消状态, 删除状态是已取消的节点。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 其他情况,设置前继节点的状态为SIGNAL。 不需要park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- parkAndCheckInterrupt park并且校验中断
//判断线程中断的状态实际上是为了不让循环一直执行 让他阻塞, 一直cas cpu飙升
private final boolean parkAndCheckInterrupt() {
// 当前线程是非中断状态,则在执行park时被阻塞
//如果当前线程是中断状态,则park方法不起作用,会立即返回,然后parkAndCheckInterrupt方法会获取中断的状态,也就是true,并复位;
LockSupport.park(this);
return Thread.interrupted(); //返回中断标识并进行复位
}
- cancelAcquire如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 通过前继节点跳过取消状态的node pred 状态 waitStatus<=0
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 取过滤后的前继节点的后继节点 predNext.waitStatus = 1
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置状态为取消状态
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 1.如果当前节点是tail:
// 尝试更新tail节点,设置tail为pred;
// 更新失败则返回,成功则设置tail的后继节点为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果当前节点不是head的后继节点
// 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL
// 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空
// 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// // 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
网友评论