1. ReentrantLock 定义
ReentrantLock 是 JUC 中提供的可中断, 可重入获取, 支持超时, 支持尝试获取锁
它主要有一下特点:
- 可重入, 一个线程获取独占锁后, 可多次获取, 多次释放(synchronized也一样, 只是synchronized内的代码执行异常后会自动释放到monitor上的锁)
- 支持中断(synchronized不支持)
- 支持超时机制, 支持尝试获取lock, 支持公不公平获取lock(主要区别在 判断 AQS 中的 Sync Queue 里面是否有其他线程等待获取 lock)
- 支持调用 Condition 提供的 await(释放lock, 并等待), signal(将线程节点从 Condition Queue 转移到 Sync Queue 里面)
- 在运行 synchronized 里面的代码若抛出异常, 则会自动释放监视器上的lock, 而 ReentrantLock 是需要显示的调用 unlock方法
先看一个demo (这个在Condition 中介绍过)
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest {
private static final Logger logger = Logger.getLogger(ConditionTest.class);
static final Lock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception{
final Thread thread1 = new Thread("Thread 1 "){
@Override
public void run() {
lock.lock(); // 线程 1获取 lock
logger.info(Thread.currentThread().getName() + " 正在运行 .....");
try {
Thread.sleep(2 * 1000);
logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal ");
condition.await(); // 调用 condition.await 进行释放锁, 将当前节点封装成一个 Node 放入 Condition Queue 里面, 等待唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 ");
lock.unlock(); // 释放锁
}
};
thread1.start(); // 线程 1 线运行
Thread.sleep(1 * 1000);
Thread thread2 = new Thread("Thread 2 "){
@Override
public void run() {
lock.lock(); // 线程 2获取lock
logger.info(Thread.currentThread().getName() + " 正在运行.....");
thread1.interrupt(); // 对线程1 进行中断 看看中断后会怎么样? 结果 线程 1还是获取lock, 并且最后还进行 lock.unlock()操作
try {
Thread.sleep(2 * 1000);
}catch (Exception e){
}
condition.signal(); // 发送唤醒信号 从 AQS 的 Condition Queue 里面转移 Node 到 Sync Queue
logger.info(Thread.currentThread().getName() + " 发送一个 signal ");
logger.info(Thread.currentThread().getName() + " 发送 signal 结束");
lock.unlock(); // 线程 2 释放锁
}
};
thread2.start();
}
}
整个执行步骤
- 线程 1 开始执行, 获取 lock, 然后开始睡眠 2秒
- 当线程1睡眠到 1秒时, 线程2开始执行, 但是lock被线程1获取, 所以 等待
- 线程 1 睡足2秒 调用 condition.await() 进行锁的释放, 并且将 线程1封装成一个 node 放到 condition 的 Condition Queue里面, 等待其他获取锁的线程给他 signal, 或对其进行中断(中断后可以到 Sync Queue里面进而获取 锁)
- 线程 2 获取锁成功, 中断 线程1, 线程被中断后, node 从 Condition Queue 转移到 Sync Queue 里面, 但是 lock 还是被 线程2获取者, 所以 node呆在 Sync Queue 里面等待获取 lock
- 线程 2睡了 2秒, 开始 用signal唤醒 Condition Queue 里面的节点(此时代表 线程1的node已经到 Sync Queue 里面)
- 线程 2释放lock, 并且在 Sync Queue 里面进行唤醒等待获取锁的节点 node
- 线程1 得到唤醒, 获取锁
- 线程1 释放锁
执行结果
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在运行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止运行, 等待一个 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在运行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 发送一个 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 发送 signal 结束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 获取一个 signal, 继续执行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
AQS重要方法与ReentrantLock的关联
从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、
tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。
这个流程图有错误,欢迎指出哈
为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。
加锁:
- 通过ReentrantLock的加锁方法Lock进行加锁操作。
- 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
- AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
- tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
- 通过ReentrantLock的解锁方法Unlock进行解锁。
- Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
- Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。
2. ReentrantLock 构造函数
ReentrantLock支持公不公平获取锁, 默认使用不公平(吞吐量大)
/**
* Creates an instance of {@code KReentrantLock}
* This is equivalent to using {@code KReentrantLock(false)}
*/
/** 默认的使用非公平的方式创建一个 KReentrantLock */
public KReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code KReentrantLock} with the
* given fairness policy
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
/** 创建 ReentrantLock 通过 fair 指定是否使用公平模式 */
public KReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的 lock 获取释放都是通过内部类 Sync 的子类 FairSync, NonfairSync 来实现, 而且两者都是继承 Sync, 而Sync是继承 AQS, 接下来我们看 FairSync 与 NonfairSync
3. 内部类 FairSync 与 NonfairSync
FairSync 与 NonfairSync 都是 AQS 的子类, lock的获取, 释放主逻辑都是交由 AQS来完成, 则子类实现模版方法(也就是模版模式)
/**
* Sync object for non-fair locks
*/
/**
* 继承 Sync 实现非公平
* 公不公平的获取锁的区别:
* 1\. 非公平-> 在获取时先cas改变一下 AQS 的state值, 改变成功就获取, 不然就加入到 AQS 的 Sync Queue 里面
* 2\. 每次获取lock之前判断是否 AQS 里面的 Sync Queue 是否有等待获取的线程
*/
static final class NonfairSync extends Sync{
private static final long serialVersionUID = 7316153563782823691L;
/**
* Perform lock. Try immediate barge, backing up to normal
* acquire on failure
*/
@Override
/**
* 获取 lock
*/
void lock() {
if(compareAndSetState(0, 1)){ // 先cas改变一下 state 成功就表示获取
// 获取成功设置 exclusiveOwnerThread
setExclusiveOwnerThread(Thread.currentThread());
}else{
acquire(1); // 获取不成功, 调用 AQS 的 acquire 进行获取
}
}
/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires){
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
/**
* 继承 Sync的公平的方式获取锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@Override
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first
*/
/**
* 公平的方式获取 锁
*/
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread(); // 1\. 获取当前的 线程
int c = getState();
if(c == 0){ // 2\. c == 0 -> 现在还没有线程获取锁
if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){ // 3\. 判断 AQS Sync Queue 里面是否有线程等待获取 锁,若没有 直接 CAS 获取lock
setExclusiveOwnerThread(current); // 4\. 获取 lock 成功 设置 exclusiveOwnerThread
return true;
}
}
else if(current == getExclusiveOwnerThread()){ // 5\. 已经有线程获取锁, 判断是否是当前的线程
int nextc = c + acquires; // 6\. 下面是进行lock 的重入, 就是计数器加 1
if(nextc < 0){
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
}
从代码中, 我们可以看出公平公平主要区别:
- 非公平-> 在获取时先cas改变一下 AQS 的state值, 改变成功就获取, 不然就加入到 AQS 的 Sync Queue 里面
- 每次获取lock之前判断是否 AQS 里面的 Sync Queue 是否有等待获取的线程
4. 内部类 Sync
Sync 继承于AQS, 它主要定义了 lock 获取释放的 nonfairTryAcquire, tryRelease方法
/** Synchronizer providing all implementation mechanics */
/** 代理 ReentrantLock 来进行 lock 的获取与释放*/
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair version below, Uses AQS state to
* represent the number of holds on the lock
*/
/**
* 通过继承 Sync 来实现 公平与非公平
*/
abstract static class Sync extends KAbstractQueuedSynchronizer{
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link :Lock#Lock}. The main reason for subclassing
* is to allow fast path for nonfair version
*/
abstract void lock();
/**
* Performs non-fair tryLock, tryAcquire is implemented in
* subclass, but both need nonfair try for tryLock method
* @param acquires
* @return
*/
/**
* 非公平的尝试获取 lock
*/
final boolean nonfairTryAcquire(int acquires){
final Thread current = Thread.currentThread(); // 1\. 获取当前的线程
int c = getState(); // 2\. 获取 aqs 中的 state(代表 独占锁 是否被获取)
if(c == 0){ // 3\. c == 0 独占锁 没有被人获取
if(compareAndSetState(0, acquires)){ // 4\. CAS 改变 state 获取锁(这里有可能有竞争, 有可能失败)
setExclusiveOwnerThread(current); // 5\. 获取 lock 成功, 设置获取锁的独占线程
return true; // 6\. 直接返回 true
}
}
else if(current == getExclusiveOwnerThread()){// 7\. 判断是否现在获取 独占锁的线程是本线程
int nextc = c + acquires; // 8\. 在 state 计数加1(重入获取锁)
if(nextc < 0){ // overflow
throw new Error("Maximum lock count exceeded");
}
setState(nextc); // 9\. 这里因为是已经获取 lock 所以不考虑并发
return true;
}
return false;
}
/**
* 释放锁 从 AQS 里面获取到的值
*/
protected final boolean tryRelease(int releases){
int c = getState() - releases; // 1\. 释放 releases (因为支持重入, 所以这里的 c 可能不为 0)
if(Thread.currentThread() != getExclusiveOwnerThread()){// 2\. 判断当前的线程是否是获取独占锁的线程
throw new IllegalMonitorStateException();
}
boolean free = false;
if(c == 0){ // 3\. lock 完全释放
free = true;
setExclusiveOwnerThread(null); // 4\. 置空 exclusiveOwnerThread
}
setState(c);
return free;
}
/**
* 判断当前的线程是否是获取 独占锁的线程
*/
protected final boolean isHeldExclusively(){
/**
* While we must in general read state before owner,
* we don't need to do so to check if current thread is owner
*/
return getExclusiveOwnerThread() == Thread.currentThread();
}
final KAbstractQueuedSynchronizer.ConditionObject newCondition(){
return new ConditionObject();
}
/********************** Methods relayed from outer class **************************/
/**
* 获取 独占锁的获取者
*/
final Thread getOwner(){
return getState() == 0 ? null : getExclusiveOwnerThread();
}
/**
* 返回 锁被获取的次数
*/
final int getHoldCount(){
return isHeldExclusively()? getState() : 0;
}
/**
* 判断锁是否被获取了
*/
final boolean isLocked(){
return getState() != 0;
}
/**
* Reconsititues the instance from a stream (that is, desrializes it)
*/
private void readObject(ObjectInputStream s) throws Exception{
s.defaultReadObject();
setState(0);
}
}
nonfairTryAcquire, tryRelease方法都是获取 lock 的模版方法, 主逻辑在 AQS 里面, 下面会详细说明
5. 获取lock方法 lock()
我们这里以 非公平模式详细说明
# Reentrant 中
public void lock(){
sync.lock();
}
# NonfairSync
void lock() {
if(compareAndSetState(0, 1)){ // 先cas改变一下 state 成功就表示获取
setExclusiveOwnerThread(Thread.currentThread()); // 获取成功设置 exclusiveOwnerThread
}else{
acquire(1); // 获取不成功, 调用 AQS 的 acquire 进行获取
}
}
# FairSync中
final void lock() {
acquire(1);
}
从上诉代码中我们可以看到最终都调用了AQS的 acquire 方法
6. AQS 获取lock方法 acquire
这是一个典型的模版模式, 主逻辑定了了(acquire), 次逻辑交由子类 FairSync, NoFairSync 来实现
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire(int)},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking can be used
* to implement method {@link "Lock#lock}
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire(int)} but is otherwise uninterpreted and
* can represent anything you like
*/
/** acquire 是用于获取锁的最常用的模式
* 步骤
* 1\. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
* 2\. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
* 3\. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
* 4\. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)
*
*/
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquireXXX方法是用于获取锁的最常用的模式
步骤:
1. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
2. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
3. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
4. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)
下面我们细分的讲解tryAcquire反正都是交由子类完成, 若这时没有成功, 则将节点入队列
7. addWaiter将节点加入到 Sync Queue
当Sync为空时直接 添加到 tail, 若不为空, 再调用 enq方法
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
/**
* 将当前的线程封装成 Node 加入到 Sync Queue 里面
*/
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode); // 1\. 封装 Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred != null){ // 2\. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点
node.prev = pred; // 3\. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
if(compareAndSetTail(pred, node)){ // 4\. CAS node 到 tail
pred.next = node; // 5\. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
return node;
}
}
enq(node); // 6\. 队列为空, 调用 enq 入队列
return node;
}
主要的流程如下:
(1)通过当前的线程和锁模式新建一个节点。
(2)Pred指针指向尾节点Tail。
(3)将New中Node的Prev指针指向Pred。
(4)通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}
从AQS的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性。tailOffset指的是tail对应的偏移量,所以这个时候会将new出来的Node置为当前队列的尾节点。同时,由于是双向链表,也需要将前一个节点指向尾节点。
8. enq 将节点加入到 Sync Queue
如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
Sync Queue 是一个支持并发操作, 双向的队列, 并且它里面始终存在一个 dummy 节点(主要是为了节省head, tail的判空, 减少代码复杂度)
/**
* 这个插入会检测head tail 的初始化, 必要的话会初始化一个 dummy 节点, 这个和 ConcurrentLinkedQueue 一样的
* Insert node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor 返回的是前继节点
*/
/**
* 将节点 node 加入队列
* 这里有个注意点
* 情况:
* 1. 首先 queue是空的
* 2. 初始化一个 dummy 节点
* 3. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占)
* 这里为什么要加入一个 dummy 节点呢?
* 这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点
* 而且这里在当前节点想获取lock时通常给前继节点 打上 signal 的标识(表示当前继节点释放lock需要通知我来获取lock)
* 若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础)
*/
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){ // Must initialize // 1. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样
if(compareAndSetHead(new Node())){ // 2. 初始化 head 与 tail (这个CAS成功后, head 就有值了, 详情将 Unsafe 操作)
tail = head;
}
}else{
node.prev = t; // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null, 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
if(compareAndSetTail(t, node)){ // 4. CAS node 到 tail
t.next = node; // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
return t;
}
}
}
}
大家若对 dummy 节点感兴趣, 可以先看看 CLH lock的实现
如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
总结一下,线程获取锁的时候,过程大体如下:
a. 当没有线程获取到锁时,线程1获取锁成功。
b. 线程2申请锁,但是锁被线程1占有。
c. 如果再有线程要获取锁,依次在队列中往后排队即可。
回到上边的代码,hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
hasQueuedPredecessors判断是否需要排队的源码分析
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
/**
* h != t 判断首不等于尾分三种情况
* 1、队列没有初始化,也就是第一个线程tf来加锁的时候那么这个时候队列没有初始化,
* h和t都是null,返回(false)
*
* 2、队列被初始化了,但是里面只有一个数据;什么情况下才会出现这种情况呢?ts加锁的时候里面就只有一个数据?
* 其实不是,因为队列初始化的时候会虚拟一个h作为头结点,tc=ts作为第一个排队的节点;tf为持有锁的节点
* 为什么这么做呢?因为AQS认为h永远是不排队的,假设你不虚拟节点出来那么ts就是h,
* 而ts其实需要排队的,因为这个时候tf可能没有执行完,还持有着锁,ts得不到锁,故而他需要排队;
* 那么为什么要虚拟为什么ts不直接排在tf之后呢,上面已经时说明白了,tf来上锁的时候队列都没有,他不进队列,
* 故而ts无法排在tf之后,只能虚拟一个thread=null的节点出来(Node对象当中的thread为null);
* 那么问题来了;究竟什么时候会出现队列当中只有一个数据呢?假设原队列里面有5个人在排队,当前面4个都执行完了
* 轮到第五个线程得到锁的时候;他会把自己设置成为头部,而尾部又没有,故而队列当中只有一个h就是第五个
* 至于为什么需要把自己设置成头部;其实已经解释了,因为这个时候五个线程已经不排队了,他拿到锁了;
* 所以他不参与排队,故而需要设置成为h;即头部;所以这个时间内,队列当中只有一个节点
* 关于加锁成功后把自己设置成为头部的源码,后面会解析到;继续第三种情况的代码分析
* 记得这个时候队列已经初始化了,但是只有一个数据,并且这个数据所代表的线程是持有锁
* h != t false 由于后面是&&运算,故而返回false
*
* 3、队列被初始化了,此时队列中元素大于1,那么h!=t则成立)
* 继续判断把h.next赋值给s;
* 因为h也就是对头对应的Node对象或者线程他是持有锁的,但是不参与排队;
* 因为h要么是虚拟出来的节点,要么是持有锁的节点;下文分析
* 假设队列大于1个,那么肯定不成立(s==null---->false)
* 由于是||运算如果返回false,还要判断s.thread != Thread.currentThread();分两种情况
* 3.1 s.thread != Thread.currentThread() 返回true,就是当前线程不等于在排队的第一个线程s;
* 这个时候整体结果就是true; 所以需要去排队
* 3.2 s.thread != Thread.currentThread() 返回false 表示当前来参与竞争锁的线程和第一个排队的线程是同一个线程
* 这个时候整体结果就是false,所以不需要去排队 **/
}
看到这里,我们理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当h != t时:
a. 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析)。
b. 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enqif (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head;} else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}
节点入队不是原子操作,所以会出现短暂的head != tail,此时Tail指向最后一个节点,而且Tail指向Head。如果Head没有指向Tail(可见5、6、7行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。
9. acquireQueued 线程获取锁方法
主逻辑:
- 当当前节点的前继节点是head节点时先 tryAcquire获取一下锁, 成功的话设置新 head, 返回
- 第一步不成功, 检测是否需要sleep, 需要的话就 sleep, 等待前继节点在释放lock时唤醒 或通过中断来唤醒
- 整个过程可能需要blocking nonblocking 几次
/**
-
Acquires in exclusive uninterruptible mode for thread already in
-
queue. Used by condition wait methods as well as acquire
-
@param node the node
-
@param arg the acquire argument
-
@return {@code} if interrupted while waiting
/ /*
- 不支持中断的获取锁
*/ final boolean acquireQueued(final Node node, int arg){ boolean failed = true; try { boolean interrupted = false; for(;;){ final Node p = node.predecessor(); // 1. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) if(p == head && tryAcquire(arg)){ // 2. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下 setHead(node); // 3. 获取 lock 成功, 直接设置 新head(原来的head可能就直接被回收) p.next = null; // help GC // help gc failed = false; return interrupted; // 4. 返回在整个获取的过程中是否被中断过 ; 但这又有什么用呢? 若整个过程中被中断过, 则最后我在 自我中断一下 (selfInterrupt), 因为外面的函数可能需要知道整个过程是否被中断过 } if(
shouldParkAfterFailedAcquire(p, node) && // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) parkAndCheckInterrupt()){ // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒 interrupted = true; } } }finally { if(failed){ // 7. 在整个获取中出错 cancelAcquire(node); // 8. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) } } }
代码中的 tryAcquire 都是交由子类 FairSync, Sync, NonFairSync来完成,
注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void setHead(Node node) { head = node; node.thread = null; node.prev = null;}
我们下面来看一下
shouldParkAfterFailedAcquire, parkAndCheckInterrupt, cancelAcquire方法.
10. shouldParkAfterFailedAcquire 判断线程是否阻塞方法
本节点在进行 sleep 前一定需要给 前继节点打上 SIGNAL 标识(
因为前继节点在 release lock 时会根据 这个标识决定是否需要唤醒后继节点来获取 lock,
若释放时 标识是0, 则说明 Sync queue 里面没有等待获取lock的线程, 或Sync queue里面的节点正在获取 lock)
一般流程:
- 第一次进入此方法 前继节点状态是 0, 则 CAS 为SIGNAL 返回 false(干嘛返回的是FALSE <- 主要是为了再次 tryAcquire 一下, 说不定就能获取锁呢)
- 第二次进来 前继节点标志为SIGNAL, ok, 标识好了, 这下就可以安心睡觉, 不怕前继节点在释放lock后不进行唤醒我了
/**
-
Checks and update status for a node that failed to acquire.
-
Returns true if thread should block. This is the main signal
-
control in all acquire loops. Requires that pred == node.prev.
-
@param pred node's predecessor holding status
-
@param node the node
-
@return {@code true} if thread should block
/ // private static boolean
shouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; if(ws == Node.SIGNAL){ // 1. 判断是否已经给前继节点打上标识SIGNAL, 为前继节点释放 lock 时唤醒自己做准备 / * This node has already set status asking a release * to signal it, so it can safely park */ return true; }
if(ws > 0){ // 2\. 遇到个 CANCELLED 的节点 (ws > 0 只可能是 CANCELLED 的节点, 也就是 获取中被中断, 或超时的节点)
/** // 这里我们帮助删除
* Predecessor was cancelled. Skip over predecessors and
* indicate retry
*/
do{
node.prev = pred = pred.prev; // 3\. 跳过所有 CANCELLED 的节点
}while(pred.waitStatus > 0);
pred.next = node; // 跳过 CANCELLED 节点
}
else{
/**
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 4\. 到这里 ws 只可能是 0 或 PROPAGATE (用于 共享模式的, 所以在共享模式中的提醒前继节点唤醒自己的方式,
// 也是给前继节点打上 SIGNAL标识 见 方法 "doReleaseShared" -> "!compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor)
}
return false;
}
11. parkAndCheckInterrupt 方法
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
/**
* 中断当前线程, 并且返回此次的唤醒是否是通过中断
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName());
return Thread.interrupted(); // Thread.interrupted() 会清除中断标识, 并返上次的中断标识
}
上述方法的流程图如下:
从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(
shouldParkAfterFailedAcquire流程):
从队列中释放节点的疑虑打消了,那么又有新问题了:
- shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
- 是在什么时间释放节点通知到被挂起的线程呢?
12. cancelAcquire 删除取消的节点
acquireQueued方法中的Finally代码:
// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { ... for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { ... failed = false; ... } ... } finally { if (failed) cancelAcquire(node); }}
通过cancelAcquire方法,将Node的状态标记为CANCELLED。接下来,我们逐行来分析这个方法的原理:
清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面)
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
/**
* 清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面)
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1\. 线程引用清空
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) // 2\. 若前继节点是 CANCELLED 的, 则也一并清除
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; // 3\. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它)
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // 4\. 标识节点需要清除
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // 5\. 若需要清除额节点是尾节点, 则直接 CAS pred为尾节点
compareAndSetNext(pred, predNext, null); // 6\. 删除节点predNext
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || // 7\. 后继节点需要唤醒(但这里的后继节点predNext已经 CANCELLED 了)
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8\. 将 pred 标识为 SIGNAL
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 8\. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 若 pred 是头节点, 则此刻可能有节点刚刚进入 queue ,所以进行一下唤醒
}
node.next = node; // help GC
}
}
当前的流程:
- 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
- 根据当前节点的位置,考虑以下三种情况:
- 当前节点是尾节点。
- 当前节点是Head的后继节点。
- 当前节点不是Head的后继节点,也不是尾节点。
根据上述第二条,我们来分析每一种情况的流程。
当前节点是尾节点。
当前节点是Head的后继节点。
当前节点不是Head的后继节点,也不是尾节点。
通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
(1)执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的
shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。(2)
shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); 复制代码
至此整个 不响应中断的获取lock过程就 OK 了, 下面我们来看一下, 响应中断的获取锁, 尝试性的获取lock, 带超时的获取lock
13. ReentrantLock 释放 lock
#Reentrant 中
/**
* 释放 lock
*/
public void unlock(){
sync.release(1);
}
#AQS 中
public final boolean release(int arg){
if(tryRelease(arg)){ // 1\. 调用子类, 若完全释放好, 则返回true(这里有lock重复获取)
Node h = head;
if(h != null && h.waitStatus != 0){ // 2\. h.waitStatus !=0 其实就是 h.waitStatus < 0 后继节点需要唤醒
unparkSuccessor(h); // 3\. 唤醒后继节点
}
return true;
}
return false;
}
在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制。
// java.util.concurrent.locks.ReentrantLock.Sync
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
我们来解释下述源码:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里的判断条件为什么是h != null && h.waitStatus != 0?
(1)h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
(2)h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
(3)h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
在看一下unparkSuccessor方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
为什么要从后往前找第一个非Cancelled的节点呢?原因如下。
之前的addWaiter方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行acquireQueued方法以后,中断如何处理?
唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
再回到acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果acquireQueued为True,就会执行selfInterrupt方法。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
(1) 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
(2) 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下ThreadPoolExecutor源码。
14. 响应中断的获取 lock
此方法与不响应的唯一区别时, 遇到线程中断直接抛出异常, 获取失败
#Reentrant中
/**
* 带中断的获取锁(被其他线程中断后就直接返回)
*/
public void lockInterruptibly() throws InterruptedException{
sync.acquireInterruptibly(1);
}
#AQS 中的
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implement by the first checking interrupt stats, then invoking
* at least once {@link #tryAcquire(int)}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire(int)}
* until success or the thread is interrupted. This method can be
* used to implement method {@link "Lock#lockInterruptibly}
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire(int)} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
/**
* 支持中断的获取 lock ,若被中断 则直接 放弃(aborting)
*/
public final void acquireInterruptibly(int arg) throws InterruptedException {
if(Thread.interrupted()){ // 1\. 判断线程是否被终止
throw new InterruptedException();
}
if(!tryAcquire(arg)){ // 2\. 尝试性的获取锁
doAcquireInterruptibly(arg); // 3\. 获取锁不成功, 直接加入到 Sync Queue 里面(这里的加入操作在doAcquireInterruptibly里面)
}
}
#AQS 中的
/**
* Acquire in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.EXCLUSIVE); // 1\. 将当前的线程封装成 Node 加入到 Sync Queue 里面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2\. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
if(p == head && tryAcquire(arg)){ // 3\. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if(shouldParkAfterFailedAcquire(p, node) && // 4\. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
parkAndCheckInterrupt()){ // 5\. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
throw new InterruptedException(); // 6\. 线程此时唤醒是通过线程中断, 则直接抛异常
}
}
}finally {
if(failed){ // 7\. 在整个获取中出错(比如线程中断)
cancelAcquire(node); // 8\. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
}
}
}
15. 响应中断及超时的获取 lock
此方法与不响应的唯一区别时, 遇到线程中断/超时直接抛出异常, 获取失败
#Reentrant 中
/**
* 带中断 及 timeout 的获取锁 (线程被中断或获取超时就直接 return )
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException{
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#AQS 中
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(Thread.interrupted()){ // 1\. 判断线程是否被终止
throw new InterruptedException();
}
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); // 2\. 尝试性的获取锁, 获取锁不成功, 直接加入到 Sync Queue 里面(这里的加入操作在doAcquireNanos里面)
}
#AQS 中
/**
* Acquire in exclusive timed mode
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout; // 0\. 计算截至时间
final Node node = addWaiter(Node.EXCLUSIVE); // 1\. 将当前的线程封装成 Node 加入到 Sync Queue 里面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2\. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
if(p == head && tryAcquire(arg)){ // 3\. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // 4\. 计算还剩余的时间
if(nanosTimeout <= 0L){ // 5\. 时间超时, 直接返回
return false;
}
if(shouldParkAfterFailedAcquire(p, node) && // 6\. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
nanosTimeout > spinForTimeoutThreshold){ // 7\. 若没超时, 并且大于spinForTimeoutThreshold, 则线程 sleep(小于spinForTimeoutThreshold, 则直接自旋, 因为效率更高 调用 LockSupport 是需要开销的)
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){ // 8\. 线程此时唤醒是通过线程中断, 则直接抛异常
throw new InterruptedException();
}
}
}finally {
if(failed){ // 9\. 在整个获取中出错(比如线程中断/超时)
cancelAcquire(node); // 10\. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
}
}
}
16. ReentrantLock 一般方法
/**
* 创建 Condition
*/
public Condition newCondition(){
return sync.newCondition();
}
/**
* lock 被获取的次数
*/
public int getHoldCount(){
return sync.getHoldCount();
}
/**
* lock是否被当前的线程获取
*/
public boolean isHeldByCurrentThread(){
return sync.isHeldExclusively();
}
/**
* 锁是否被获取
*/
public boolean isLocked(){
return sync.isLocked();
}
/**
* 是否是公平模式
*/
public final boolean isFair(){
return sync instanceof FairSync;
}
/**
* 获取持有lock的线程
*/
protected Thread getOwer(){
return sync.getOwner();
}
/**
* 是否有线程等待获取lock
*/
public final boolean hasQueuedThreads(){
return sync.hasQueuedThreads();
}
/**
* 当前的线程是否在 AQS Sync Queue 里面等待获取lock
*/
public final boolean hasQueuedThread(Thread thread){
return sync.isQueued(thread);
}
/**
* AQS Sync Queue 里面等待获取锁的线程的长度
*/
public final int getQueueLength(){
return sync.getQueueLength();
}
/**
* AQS Sync Queue 里面等待获取锁的线程
*/
protected Collection<Thread> getQueuedThreads(){
return sync.getQueuedThreads();
}
/**
* 是否有线程在 Condition Queue 里面等待获取锁
*/
public boolean hasWaiters(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException(" not owber ");
}
return sync.hasWaiters((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
/**
* Condition Queue 里面等待获取锁的长度
*/
public int getWaitQueueLength(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException("not owner");
}
return sync.getWaitQueueLength((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
/**
* Condition Queue 里面等待获取锁的线程
*/
protected Collection<Thread> getWaitingThreads(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException("not owner");
}
return sync.getWaitingThreads((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
17. 总结
ReentrantLock 是JUC中使用频繁的一个lock, 而且掌握它有助于理解lock的设计方式方法, 当需要完全理解它, 可能还需要弄懂 Condition 与
AbstractQueuedSynchronizer
作者:hsfxuebao
链接:https://juejin.cn/post/6941175066180714527
来源:掘金
网友评论