加锁操作(公平、独占锁情形下)
假设有T1、T2、T3三个线程按 T1 → T2 → T3 的先后顺序来抢锁:
- T1 → 通过修改state=1 获取锁成功
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// T1只会在第一个判断之后返回
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1、确保当前节点是CHL队列中的第一个线程
// 2、cas操作将state改为1
// 3、将exclusiveOwnerThread指向当前线程,标记当前持有锁的线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- T2 → CAS修改state=1失败(获取锁失败) → 构建双向链表队列:Head(Thread为null,waitState=-1)、tail(Thread为T2,waitState=0)→ 进入park阻塞
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// tryAcquire: T2在第一个判断后发现拿不到锁(tryAcquire返回false)
// addWaiter: 则调用addWaiter尝试入队
// acquireQueued: 入队完成后,再次去尝试拿锁,如果拿到锁则出队,拿不到则park(阻塞)。此处注意acquireQueued返回值:如果是true代表被中断过,那么需要调用selfInterrupt中断当前线程??
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 由于是这个同步器第一次入队,因此pred(tail)肯定为空,那么T2会直接到这,去创建一个队列并入队
enq(node);
return node;
}
// 很经典的一个线程安全的双向链表构建范式!
// 特点:只有一处返回,即队列构建完成(构建队列的动作也可能在中间被其他线程给完成)并入队成功才返回head节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 先构建一个虚拟节点作为head节点,cas设置head节点
if (compareAndSetHead(new Node()))
// 如果成功了,将head和tail同时指向这个虚拟节点
tail = head;
} else {
// 说明队列已经构建完成,则cas尝试让当前节点入队(尾插)
// 也就是将上一个tail作为当前节点的prev节点
node.prev = t;
if (compareAndSetTail(t, node)) {
// 入队成功后,将上一个tail的下个节点指向当前节点,完成入队
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点就是head并且获取锁成功(tryAcquire方法前面已分析),则执行当前节点出队操作
if (p == head && tryAcquire(arg)) {
// 下面操作旨在将当前节点置为一个dummy node供head节点引用
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 拿不到锁,则
// 1、shouldParkAfterFailedAcquire:判断前驱节点waitStatus是否为-1,否则先改前驱节点的waitStatus为-1,在第二次进入这段逻辑判断则会返回true,进入parkAndCheckInterrupt
// 2、parkAndCheckInterrupt:将当前线程挂起
// (由于LockSupport.park(Thread)方法可被中断唤醒,因此如果当前线程被中断唤醒了,那么当前方法最终返回值则为true)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 这段比较复杂,待分析
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- T3 → CAS修改state=1失败(获取锁失败) → 入队:构建一个节点(Thread为T3,waitState=0),修改T2所在的节点(即tail节点)的waitState=-1,再将tail指向当前新建的节点 → 进入park阻塞
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// T3此处与T2唯一的不同点在于入队操作时不需要再创建队列,只需创建节点尝试入队
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 由于T2已创建了队列,则tail不为空,那么尝试入队
// 可以发现下面这块代码和java.util.concurrent.locks.AbstractQueuedSynchronizer#enq的for循环中else分支基本一模一样,也就是尾插入队操作
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// T3除了在入队操作上不需再创建队列之外,其他的代码逻辑跟T2完全相同
网友评论