流程图
未命名文件 (7).png入口
直接new一个ReentrantLock,可以发现创建了一个非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
而这个非公平锁继承了这个sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
即ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的,Sync类继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
}
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁)
FairSync
调用公平锁的lock方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// ...
}
AbstractQueuedSynchronizer# acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock.FairSync# tryAcquire
尝试获取锁,返回值为true,代表获取成功,可能有两种情况:
- 没有现存等待锁
- 重入锁,线程本来就持有锁,可以理所当然直接获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 到这里说明此事没有线程池有锁,但是因为是公平锁,需要先看看是否CLH同步队列中有线程在等待
if (!hasQueuedPredecessors() &&
// 2. 如果没有线程在等待,则CAS获取锁,成功了就获取了
// 没有CAS成功的话,说明刚才尝试CAS的时候,另一个线程也在CAS,并且另一个线程抢先获得了锁,则直接走到最下面返回false
compareAndSetState(0, acquires)) {
// 3. 获取到锁以后,通知大家,现在是我获取到了锁
setExclusiveOwnerThread(current);
return true;
}
}
// 这里说明已经有线程获取到锁了,且获取到锁的线程是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果没有获取到锁,则直接继续进入AQS的代码,去进入等待队列
image.pngAbstractQueuedSynchronizer# addWaiter
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
AbstractQueuedSynchronizer# enq
private Node enq(final Node node) {
// 自旋的方式入队
for (;;) {
Node t = tail;
// CLH同步队列未初始化
if (t == null) { // Must initialize
// 初始化一个空的头结点
if (compareAndSetHead(new Node()))
// 让头指针和尾指针指向这个空节点,这个初始化执行完,因为没有return,就继续自旋
tail = head;
} else {
// 初始化完以后进入这里,这里不断通过CAS尝试,将自己设置为新的尾结点,不成功的话,就继续自旋,继续让前驱指针指向新的尾结点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
执行完上面的操作,开始准备执行acquireQueued这个方法
image.png关键词
- AQS维护的同步队列是一个FIFO先进先出队列
- CLH同步队列中的节点的waitStatus等待状态默认是0
CLH同步队列中节点获取锁步骤
假设当前有两个线程,线程A和线程B,线程A已经获得锁,线程B获取锁失败,构建成Node节点进入CLH同步队列,同时通过enq()方法以后,CLH队列中如下图
image.png
final boolean acquireQueued(final Node node, int arg) {
// 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 查看当前node节点的前置节点p
if (p == head && tryAcquire(arg)) {
// 如果前置节点是头部节点,B线程进行尝试获取锁
// 如果!是如果!
// 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
setHead(node);
// 把node节点的线程设置为null,然后head指到node上
p.next = null; // help GC
// 回收操作
failed = false;
// 返回,然后将执行的将会是B线程
return interrupted;
}
// 按照我们的线程ABC的样例,上面的if肯定不会执行
// 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
interrupted = true;
// 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 返回true,挂起,返回false则不挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前节点的状态=-1,返回true,直接挂起即可
return true;
if (ws > 0) {
// 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
do {
node.prev = pred = pred.prev;
// 拆分为2步
// pred = pred.prev
// node.prev = pred
} while (pred.waitStatus > 0);
// 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
pred.next = node;
} else {
// 利用CAS设置前节点状态位-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面代码acquireQueued中如果线程B获取锁失败,则判断是否挂起B,通过B的前驱节点head的等待状态,默认为0,然后将其设置为-1,在第二次获取失败的时候(因此锁还在被线程A占用),直接shouldParkAfterFailedAcquire返回true,并将B线程挂起,直到线程A释放锁,调用unpark唤醒CLH同步队列中第一个等待的有效节点B,此时线程B从836行醒过来,继续执行837行返回,并最终在accquireQueued中获得锁
image.png节点入同步队列变化
- 如果线程A获取到锁,B线程在acquireQueued方法的时候,线程B初始化了head头结点,此时head==tail,并且由于此时的waitStatus没设置,因此java默认设置waitStatus==0
-
线程B入队
image.png
我们可以看到在线程B入队列以后,此时head结点的waitStatus发生了变化。在
shouldParkAfterFailedAcquire被执行的时候,线程B将前驱结点,即head结点的waitStatus设置为-1
- 如果线程C此时再进来,直接插到线程B后面,此时线程C的waitStatus==0
然后C同样需要执行acquireQueued()。此时的节点是线程C,其前驱节点是线程B,不是head,线程C直接运行到shouldParkAfterFailedAcquire方法中然后通过CAS方法设置B节点waitStatus为-1,并且之后又经过1次循环操作,最后返回true,此时队列为
image.png总结:enq方法让新的获取锁失败线程进队,acquireQueued方法让新的获取锁失败线程节点的前驱节点的等待状态为-1,并且挂起该新的获取锁失败线程
waitStatus中signal(-1)的意思是:代表后继节点需要被唤醒,即waitStatus记录的不是当前线程节点的状态,而是后继节点的状态。每个node入队的时候,都会将前驱节点的等待状态设置为SIGNAL,然后阻塞,等待被前驱节点唤醒
释放锁
ReentrantLock# unlock
public void unlock() {
sync.release(1);
}
AbstractQueuedSynchronizer# release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果头节点存在,且头节点的waitStatus不为0,即说明后面有节点需要唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AbstractQueuedSynchronizer# tryRelease
protected final boolean tryRelease(int releases) {
// 当前的permit许可数量 - 要释放的数量
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 释放许可以后,不再有线程持有锁的话
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
执行完回到下面代码这里,开始执行unparkSuccessor
image.pngAbstractQueuedSynchronizer# unparkSuccessor(唤醒后继节点)
// 参数是头节点,从上图可以看到
private void unparkSuccessor(Node node) {
/*
说明有后继节点需要唤醒,唤醒前,先清除-1的等待状态
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)那么就从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
节点出同步队列变化
同步队列最开始没有元素的时候,构造了空的头节点,后面构建的新的获取锁失败线程节点等在该空节点后,之前持有锁的线程释放锁以后,唤醒该空的头节点后面的第一个线程B,并且清除该头节点,将唤醒线程所在的节点设置为新的头节点,当唤醒线程B执行结束释放锁的时候,此时清空的之前B节点改造的头节点信息,此时队列中仅仅存在一个C节点,并设置C节点为新的头节点,C节点线程C开始执行
image.png最后,当同步队列没有线程等待的时候,仅仅剩下了一个thread为空,head,tail指针都指向的空节点,之前的一系列操作等于初始化了该同步队列,下一次再来线程的时候,直接不用进enq,直接入队列CAS-tail
网友评论