下面会通过一步步的场景进行分析ReentrantLock公平锁的源码,文章为比较长,做好准备。。。
首先ReentrantLock默认创建的就是非公平锁,所以要在ReentrantLock的构造函数中传入true,创建公平锁
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
下面我们来一步步分析公平锁进行加锁 lock 的源码实现:
注意,会通过一个个线程进入lock的形式进行分析;
当线程T1调用 lock():注意T1是第一个去竞争锁的线程,后面的T2,T3。。默认都是在T1还没执行完的情况下去分析的;
public voidlock() {
sync.lock();
}
继而进入FairSync里面的lock()方法:
final voidlock() {
acquire(1);
}
接着进入acquire(1)方法,这里面才是重点:
public final voidacquire(intarg) {
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
这个acquire(int arg)由四个方法构成:
tryAcquire:尝试获得锁
addWaiter:将当前线程封装成Node对象
acquireQueued:将当前线程表示的Node对象放到链表中进行排队,同时park当前线程;
selfInterrupt:相应线程中断;
代码块1 :分析acquire方法;
public final voidacquire(intarg) {
//首先会进入tryAcquire(arg)方法,看到这行注释,请把目光移到下面代码块2中。。。
//代码块2 返回了true,而!tryAcquire(arg) 将返回结果取了反,也就是true变为了false,所以该方法结束,当前线程T1已经占有锁;
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
代码块2 :
tryAcquire中的源码:
protected final boolean tryAcquire(int acquires) {
//current是当前来获取锁的这个线程,也就是我们说的T1线程
final Thread current = Thread.currentThread();
//state 表示锁状态,默认是0
//因为T1是第一个进来加锁的,所以这里getState()得到的结果肯定等于0,所以 c = 0
int c = getState();
if (c == 0) {
//因为c=0,那么进入到这里
//hasQueuedPredecessors:作用主要是判断当前AQS队列中有没有其他线程在排队,具体看下面详细说明,请将目光移到 代码块3
//通过分析代码块3,得知hasQueuedPredecessors会返回false,而!hasQueuedPredecessors()取了反,所以这里的结果变成了true,
//接着进入了compareAndSetState,进行CAS操作,将state改变为1,表示占有锁 ,返回true
//compareAndSetState 返回了true ,进入到 setExclusiveOwnerThread,这里是将当前线程设置为占有锁线程,用来支持锁重入
//结果返回true,将true带回到代码块1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
代码块3:
hasQueuedPredecessors中的源码:
public final boolean hasQueuedPredecessors() {
//因为T1是第一个进来加锁的线程,那么这里的tail = null , head = null,因为还没有线程去初始化他们,所以肯定是null;
Node t = tail;
Node h = head;
Node s;
//h != t 相当于 head != tail 相当于 null != null,所以这里肯定返回的是false,下面的判断就不用再看了,先分析T1进来的情况
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
然后返回false;带着false将目光回到代码块2
总结:
当第一个线程进来获得锁的时候,会将state由原来默认的0改为1,注意,此时并没有初始化链表,也就是说,如果线程都是交替执行,那么永远跟AQS队列无关;
假设 T1 还没有执行完,此时 T2 来了,也调用了 lock() ,T2 也要获得锁:
通过上面的分析,我们直接进入到这个方法:注意这里是T2线程,T1还没有执行完,锁还在T1那里
代码块1 :分析acquire方法;
public final voidacquire(intarg) {
//首先会进入tryAcquire(arg)方法,看到这行注释,请把目光移到下面代码块2中。。。
//代码块2 返回了false,而 !tryAcquire(arg) 将返回结果取了反,也就是false变为了true,这逻辑将进入到acquireQueued(addWaiter(Node.EXCLUSIVE),arg) ,
//首先会先进入到addWaiter(Node.EXCLUSIVE),请将目光转向代码块3中
//addWaiter(Node.EXCLUSIVE)经过代码块3和代码块4的分析,这里返回了当前线程T2表示的node对象,带着这个返回的node进入到acquireQueued方法当中,带着node 和 1,将目光移到代码块5
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
代码块2 :
tryAcquire中的源码:
protected final boolean tryAcquire(int acquires) {
//当前线程,也就是T2线程
final Thread current = Thread.currentThread();
//因为T1线程还没有执行完,所以T1线程还持有锁,那么getState()返回的必然不是0,假设T1没有重入,那么getState()返回的是1 , c = 1
int c = getState();
// 因为 c = 1 ,所以逻辑不会进入到这里
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 因为 c = 1 所以来到这个判断,getExclusiveOwnerThread返回的当前持有锁的线程,也就是T1, current = T2,那么T2 == T1?当然不等于,返回false,带着false回到代码块1中
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
代码块3:
private Node addWaiter(Node mode) {
//将当前线程封装为Node对象,这个Node当中维护了当前线程的状态、前一个节点,后一个节点,当前线程的引用等等
Node node = new Node(Thread.currentThread(), mode);
//因为T1获得锁的时候,并没有初始化AQS队列,所以head节点和tail节点都是null,所以pred = tail = null;
Node pred = tail;
// pred = null ,所以这个条件不成立,将进入下面的enq方法
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//当线程第一次初始化AQS队列的时候,也就是队列中还没有任何等待的线程,enq方法很重要,请将目光移到代码块4:
enq(node);
//代码块4执行完,返回了当前线程T2表示的node节点,返回node,带着node回到代码块1中
return node;
}
代码块4:
private Node enq(final Node node) {
//这里是个死循环
for (;;) {
//第一次进入循环,tail 肯定为 null ,因为队列中没有其他等待的线程,所以 t = tail = null
//第二次进入循环,t = tail = head = new Node();
Node t = tail;
//第一次进入循环,t = null,所以会进入到里面的逻辑
//第二次进入循环,t = tail = head = new Node(),也就是不等于null,所以进入到else 逻辑;
if (t == null) { // Must initialize
//第一次进入循环,当tail = null 的时候,说明队列还没有被初始化,也就是AQS还没有任何线程进来排队过,通过compareAndSetHead 为 head 创建一个空Node,哨兵节点
//第一次进入循环,同时tail = head = new Node(),接着进入下一次循环
if (compareAndSetHead(new Node()))
tail = head;
} else {
//第二次循环会进入到这里,因为第一次循环中创建了哨兵节点head
// node.prev = t :当前节点的前驱节点指向 t = tail = head,也就是指向head节点
node.prev = t;
// 通过CAS将当前T2线程表示的节点node设置tail节点
if (compareAndSetTail(t, node)) {
//t.next = node ,t = head,所以将head节点的后继节点指向当前T2线程表示的节点
t.next = node;
//返回当前节点,带着当前线程T2表示的node,返回到代码块3中
return t;
}
}
}
}
代码块5:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//这里是一个死循环
for (;;) {
//第一次循环,获得当前节点的前驱节点,也就是head节点
//第二次循环,获得当前节点的前驱节点,也就是head节点
final Node p = node.predecessor();
//第一次循环,p == head ? 当然等于,所以进入tryAcquire尝试获得锁,因为前面的处理过程中,可能T1已经执行完了,这里其实也相当于自旋,我们假设T1还没有执行完,这里没有获取到锁返回false
//第二次循环,P == head ? 当然等于,所以再次进入到tryAcquire中尝试获得锁,又一次自旋,假设还是没有获得到锁,因为T1还没有执行完,锁还没有被释放
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//第一次循环,带着当前节点node的前驱节点head,和当前节点node,进入到shouldParkAfterFailedAcquire方法之中,这里很重要,请看代码块6
//第一次循环,从代码块6中的shouldParkAfterFailedAcquire返回了false,那么第一次循环结束
//第二次循环,带着当前节点node的前驱节点head,和当前节点node,再次进入到shouldParkAfterFailedAcquire方法之中,请看代码块6
//第二次循环,从代码块6中的shouldParkAfterFailedAcquire返回了true,代表着将进入parkAndCheckInterrupt方法将当前线程休眠,等待被唤醒,代码阻塞在里面,至此,T2进入睡眠;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代码块6:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//外面第一次循环:pred = head , node 为 T2,因为T2刚刚才初始化了head节点,所以head节点中的waitStatus = 0;
//外面第一次循环:ws = 0 ;
//外面第二次循环:因为在第一次的时候,我们已经将head节点中waitStatus改为-1
int ws = pred.waitStatus;
//外面第一次循环:ws == Node.SIGNAL == -1?ws = 0,所以不会进入到这里面
//外面第二次循环:ws == Node.SIGNAL == -1?ws = -1,所以会进入到这里面
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//返回true,带着这个true,回到代码块5中
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//外面第一次循环:会来到这里,通过CAS将head节点中的waitStatus 改 -1,然后返回false,带着false回到代码块5中
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
总结:T2在经过一次获取锁,两次自旋后还是没有获得到锁,初始化了head哨兵节点,并将head节点中的waitStatus = -1 ,T2进入了睡眠,等待唤醒;
此时,T1还是没有执行完,T3又来竞争锁:
代码块1:
public final void acquire(int arg) {
//还是通过tryAcquire尝试获得锁,这里不在分析tryAcquire,因为T1还没有被执行完,T2正在睡眠,tryAcquire还是没有获得到锁,返回false,则进入代码块2
//代码块2中,addWaiter成功将当前T3表示的node加入到链表尾部,并返回当前node
//带着当前线程T3表示的node进入到acquireQueued,这里面其实就是自旋和当前是否休眠的操作,请看代码块3
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码块2:
private Node addWaiter(Node mode) {
//将当前线程封装为Node对象,这个Node当中维护了当前线程的状态、前一个节点,后一个节点,当前线程的引用等等
Node node = new Node(Thread.currentThread(), mode);
//因为T2刚刚已经初始化了AQS队列,并创建了head哨兵节点,同时将tail 设置了等于T2线程表示的node,所以tail = T2表示的node;
Node pred = tail;
// pred = T2node ,所以这个条件成立,将进入方法
if (pred != null) {
//将当前节点的前驱节点指向T2表示的node
node.prev = pred;
//将tail表示的节点等于当前节点T3表示的node,相当于T3表示node是尾节点
if (compareAndSetTail(pred, node)) {
//pred = T2表示的node,所以这里是将T2表示的node的下一个节点指向当前T3表示的node,这就是一个双向链表的数据结构,即使后面T4,T5。。。也是这样链接起来
pred.next = node;
// 返回当前node,带着node回到代码块1中
return node;
}
}
//当线程第一次初始化AQS队列的时候,也就是队列中还没有任何等待的线程,enq方法很重要,请将目光移到代码块4:
enq(node);
//代码块4执行完,返回了当前线程T2表示的node节点,返回node,带着node回到代码块1中
return node;
}
代码块3:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//这里是一个死循环
for (;;) {
//第一次循环,获得当前节点的前驱节点,也就是T2表示的node节点
//第二次循环,获得当前节点的前驱节点,也就是T2表示的node节点
final Node p = node.predecessor();
//第一次循环,p == head ? 当然不等于,因为现在当前节点的前驱节点是T2表示的node节点,直接返回了false,进入到下面的shouldParkAfterFailedAcquire当中,请看代码块4
//第二次循环,P == head ? 当然不等于,所以再次进入到tryAcquire中尝试获得锁,又一次自旋,假设还是没有获得到锁,因为T1还没有执行完,锁还没有被释放
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//第一次循环,带着当前节点node的前驱节点head,和当前节点node,进入到shouldParkAfterFailedAcquire方法之中,这里很重要,请看代码块4
//第一次循环,从代码块4中的shouldParkAfterFailedAcquire返回了false,那么第一次循环结束
//第二次循环,带着当前节点T3表示的node的前驱节点T2 node,和当前节点T3 node,再次进入到shouldParkAfterFailedAcquire方法之中,请看代码块4
//第二次循环,从代码块4中的shouldParkAfterFailedAcquire返回了true,代表着将进入parkAndCheckInterrupt方法将当前线程休眠,等待被唤醒,代码阻塞在里面,至此,T3进入睡眠;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代码块4:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//外面第一次循环:pred = T2表示的node , node 为 T2,因为T2刚刚将head节点的waitStatus = -1之后自己进入了睡眠,但是T2的waitStatus还是等于0;
//外面第一次循环:ws = 0 ;
//外面第二次循环:因为在第一次的时候,我们已经将T2 node节点中waitStatus改为-1
int ws = pred.waitStatus;
//外面第一次循环:ws == Node.SIGNAL == -1?ws = 0,所以不会进入到这里面
//外面第二次循环:ws == Node.SIGNAL == -1?ws = -1,所以会进入到这里面
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//返回true,带着这个true,回到代码块3中
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//外面第一次循环:会来到这里,通过CAS将head节点中的waitStatus 改 -1,然后返回false,带着false回到代码块5中
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
总结:T3 和 T2的过程差不多,只是T2多了个初始化head节点的操作;
至此,当前AQS中,T1正在实行,T2 ,T3正在队列中睡眠,这就是当线程竞争锁不到时排队的情景;
下面我们来看看另外一个场景:
假设T2在获取不到锁的时候,准备加入队列中的时候,T1执行完了,释放了锁,这个时候是怎样的情况?
代码块1:
public final void acquire(int arg) {
//当T1未执行完时,T2线程来了,并且tryAcquire在尝试获得锁失败,在经过addWaiter封装成了node对象进入acquireQueued准备排队,我们来看看此时acquireQueued的情况
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码块2:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//由于在addWaiter中已经初始化了head节点,当前node排在head之后
// p = head = node.predecessor()
final Node p = node.predecessor();
// p == head ? 明显是等于的,这个时候进入tryAcquire尝试获得锁,在这之前T1执行完了,所以这里T2获得到了锁,返回true,进入
if (p == head && tryAcquire(arg)) {
//将当前节点node设置为head节点,因为他已经获得锁了,不用在排队了,所以把head=null,让GC回收,当前节点node变为了head
//setHead当中做了三个事情:1.将head等于当前node,2.将当前节点node的前驱节点=null,3.将当前节点的thread等于null
setHead(node);
//因为当前节点node已经变为了head,所以head的next也是等于空
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上就是ReentrantLock之公平锁的加锁过程,后续我们会分析加锁过程
总结:
1.公平锁在调用lock的时候总共尝试获得锁几次?
答:
1.如果是第一个线程,则一次就获得成功
2.如果是第二个线程,第一个线程还没有执行完,那么会经过三次尝试获得锁,在得不到锁的情况进入休眠
网友评论