1. ReentrantLock 定义
ReentrantLock 是 JUC 中提供的可中断, 可重入获取, 支持超时, 支持尝试获取锁
它主要有一下特点:
-
可重入, 一个线程获取独占锁后, 可多次获取, 多次释放(synchronized也一样, 只是synchronized内的代码执行异常后会自动释放到monitor上的锁)
-
支持中断(synchronized不支持)
-
支持超时机制, 支持尝试获取lock, 支持公不公平获取lock(主要区别在 判断 AQS 中的 Sync Queue 里面是否有其他线程等待获取 lock)
-
支持调用 Condition 提供的 await(释放lock, 并等待), signal(将线程节点从 Condition Queue 转移到 Sync Queue 里面)
-
在运行 synchronized 里面的代码若抛出异常, 则会自动释放监视器上的lock, 而 ReentrantLock 是需要显示的调用 unlock方法
先看一个demo (这个在Condition 中介绍过)
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by xujiankang on 2017/2/8.
*/
public class ConditionTest {
private static final Logger logger = Logger.getLogger(ConditionTest.class);
static final Lock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception{
final Thread thread1 = new Thread("Thread 1 "){
@Override
public void run() {
lock.lock(); // 线程 1获取 lock
logger.info(Thread.currentThread().getName() + " 正在运行 .....");
try {
Thread.sleep(2 * 1000);
logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal ");
condition.await(); // 调用 condition.await 进行释放锁, 将当前节点封装成一个 Node 放入 Condition Queue 里面, 等待唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 ");
lock.unlock(); // 释放锁
}
};
thread1.start(); // 线程 1 线运行
Thread.sleep(1 * 1000);
Thread thread2 = new Thread("Thread 2 "){
@Override
public void run() {
lock.lock(); // 线程 2获取lock
logger.info(Thread.currentThread().getName() + " 正在运行.....");
thread1.interrupt(); // 对线程1 进行中断 看看中断后会怎么样? 结果 线程 1还是获取lock, 并且最后还进行 lock.unlock()操作
try {
Thread.sleep(2 * 1000);
}catch (Exception e){
}
condition.signal(); // 发送唤醒信号 从 AQS 的 Condition Queue 里面转移 Node 到 Sync Queue
logger.info(Thread.currentThread().getName() + " 发送一个 signal ");
logger.info(Thread.currentThread().getName() + " 发送 signal 结束");
lock.unlock(); // 线程 2 释放锁
}
};
thread2.start();
}
}
整个执行步骤
-
线程 1 开始执行, 获取 lock, 然后开始睡眠 2秒
-
当线程1睡眠到 1秒时, 线程2开始执行, 但是lock被线程1获取, 所以 等待
-
线程 1 睡足2秒 调用 condition.await() 进行锁的释放, 并且将 线程1封装成一个 node 放到 condition 的 Condition Queue里面, 等待其他获取锁的线程给他 signal, 或对其进行中断(中断后可以到 Sync Queue里面进而获取 锁)
-
线程 2 获取锁成功, 中断 线程1, 线程被中断后, node 从 Condition Queue 转移到 Sync Queue 里面, 但是 lock 还是被 线程2获取者, 所以 node呆在 Sync Queue 里面等待获取 lock
-
线程 2睡了 2秒, 开始 用signal唤醒 Condition Queue 里面的节点(此时代表 线程1的node已经到 Sync Queue 里面)
-
线程 2释放lock, 并且在 Sync Queue 里面进行唤醒等待获取锁的节点 node
-
线程1 得到唤醒, 获取锁
-
线程1 释放锁
执行结果
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在运行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止运行, 等待一个 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在运行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 发送一个 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 发送 signal 结束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 获取一个 signal, 继续执行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
2. ReentrantLock 构造函数
ReentrantLock支持公不公平获取锁, 默认使用不公平(吞吐量大)
/**
* Creates an instance of {@code KReentrantLock}
* This is equivalent to using {@code KReentrantLock(false)}
*/
/** 默认的使用非公平的方式创建一个 KReentrantLock */
public KReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code KReentrantLock} with the
* given fairness policy
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
/** 创建 ReentrantLock 通过 fair 指定是否使用公平模式 */
public KReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的 lock 获取释放都是通过内部类 Sync 的子类 FairSync, NonfairSync 来实现, 而且两者都是继承 Sync, 而Sync是继承 AQS, 接下来我们看 FairSync 与 NonfairSync
3. ReentrantLock 内部类 FairSync 与 NonfairSync
FairSync 与 NonfairSync 都是 AQS 的子类, lock的获取, 释放主逻辑都是交由 AQS来完成, 则子类实现模版方法(也就是模版模式)
/**
* Sync object for non-fair locks
*/
/**
* 继承 Sync 实现非公平
* 公不公平的获取锁的区别:
* 1\. 非公平-> 在获取时先cas改变一下 AQS 的state值, 改变成功就获取, 不然就加入到 AQS 的 Sync Queue 里面
* 2\. 每次获取lock之前判断是否 AQS 里面的 Sync Queue 是否有等待获取的线程
*/
static final class NonfairSync extends Sync{
private static final long serialVersionUID = 7316153563782823691L;
/**
* Perform lock. Try immediate barge, backing up to normal
* acquire on failure
*/
@Override
/**
* 获取 lock
*/
void lock() {
if(compareAndSetState(0, 1)){ // 先cas改变一下 state 成功就表示获取
setExclusiveOwnerThread(Thread.currentThread()); // 获取成功设置 exclusiveOwnerThread
}else{
acquire(1); // 获取不成功, 调用 AQS 的 acquire 进行获取
}
}
/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires){
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
/**
* 继承 Sync的公平的方式获取锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@Override
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first
*/
/**
* 公平的方式获取 锁
*/
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread(); // 1\. 获取当前的 线程
int c = getState();
if(c == 0){ // 2\. c == 0 -> 现在还没有线程获取锁
if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){ // 3\. 判断 AQS Sync Queue 里面是否有线程等待获取 锁,若没有 直接 CAS 获取lock
setExclusiveOwnerThread(current); // 4\. 获取 lock 成功 设置 exclusiveOwnerThread
return true;
}
}
else if(current == getExclusiveOwnerThread()){ // 5\. 已经有线程获取锁, 判断是否是当前的线程
int nextc = c + acquires; // 6\. 下面是进行lock 的重入, 就是计数器加 1
if(nextc < 0){
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
}
从代码中, 我们可以看出公平公平主要区别:
-
非公平-> 在获取时先cas改变一下 AQS 的state值, 改变成功就获取, 不然就加入到 AQS 的 Sync Queue 里面
-
每次获取lock之前判断是否 AQS 里面的 Sync Queue 是否有等待获取的线程
4. ReentrantLock 内部类 Sync
Sync 继承于AQS, 它主要定义了 lock 获取释放的 nonfairTryAcquire, tryRelease方法
/** Synchronizer providing all implementation mechanics */
/** 代理 ReentrantLock 来进行 lock 的获取与释放*/
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair version below, Uses AQS state to
* represent the number of holds on the lock
*/
/**
* 通过继承 Sync 来实现 公平与非公平
*/
abstract static class Sync extends KAbstractQueuedSynchronizer{
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link :Lock#Lock}. The main reason for subclassing
* is to allow fast path for nonfair version
*/
abstract void lock();
/**
* Performs non-fair tryLock, tryAcquire is implemented in
* subclass, but both need nonfair try for tryLock method
* @param acquires
* @return
*/
/**
* 非公平的尝试获取 lock
*/
final boolean nonfairTryAcquire(int acquires){
final Thread current = Thread.currentThread(); // 1\. 获取当前的线程
int c = getState(); // 2\. 获取 aqs 中的 state(代表 独占锁 是否被获取)
if(c == 0){ // 3\. c == 0 独占锁 没有被人获取
if(compareAndSetState(0, acquires)){ // 4\. CAS 改变 state 获取锁(这里有可能有竞争, 有可能失败)
setExclusiveOwnerThread(current); // 5\. 获取 lock 成功, 设置获取锁的独占线程
return true; // 6\. 直接返回 true
}
}
else if(current == getExclusiveOwnerThread()){// 7\. 判断是否现在获取 独占锁的线程是本线程
int nextc = c + acquires; // 8\. 在 state 计数加1(重入获取锁)
if(nextc < 0){ // overflow
throw new Error("Maximum lock count exceeded");
}
setState(nextc); // 9\. 这里因为是已经获取 lock 所以不考虑并发
return true;
}
return false;
}
/**
* 释放锁 从 AQS 里面获取到的值
*/
protected final boolean tryRelease(int releases){
int c = getState() - releases; // 1\. 释放 releases (因为支持重入, 所以这里的 c 可能不为 0)
if(Thread.currentThread() != getExclusiveOwnerThread()){// 2\. 判断当前的线程是否是获取独占锁的线程
throw new IllegalMonitorStateException();
}
boolean free = false;
if(c == 0){ // 3\. lock 完全释放
free = true;
setExclusiveOwnerThread(null); // 4\. 置空 exclusiveOwnerThread
}
setState(c);
return free;
}
/**
* 判断当前的线程是否是获取 独占锁的线程
*/
protected final boolean isHeldExclusively(){
/**
* While we must in general read state before owner,
* we don't need to do so to check if current thread is owner
*/
return getExclusiveOwnerThread() == Thread.currentThread();
}
final KAbstractQueuedSynchronizer.ConditionObject newCondition(){
return new ConditionObject();
}
/********************** Methods relayed from outer class **************************/
/**
* 获取 独占锁的获取者
*/
final Thread getOwner(){
return getState() == 0 ? null : getExclusiveOwnerThread();
}
/**
* 返回 锁被获取的次数
*/
final int getHoldCount(){
return isHeldExclusively()? getState() : 0;
}
/**
* 判断锁是否被获取了
*/
final boolean isLocked(){
return getState() != 0;
}
/**
* Reconsititues the instance from a stream (that is, desrializes it)
*/
private void readObject(ObjectInputStream s) throws Exception{
s.defaultReadObject();
setState(0);
}
}
nonfairTryAcquire, tryRelease方法都是获取 lock 的模版方法, 主逻辑在 AQS 里面, 下面会详细说明
5. ReentrantLock 获取lock方法 lock()
我们这里以 非公平模式详细说明
# Reentrant 中
public void lock(){
sync.lock();
}
# NonfairSync
void lock() {
if(compareAndSetState(0, 1)){ // 先cas改变一下 state 成功就表示获取
setExclusiveOwnerThread(Thread.currentThread()); // 获取成功设置 exclusiveOwnerThread
}else{
acquire(1); // 获取不成功, 调用 AQS 的 acquire 进行获取
}
}
# FairSync中
final void lock() {
acquire(1);
}
从上诉代码中我们可以看到最终都调用了AQS的 acquire 方法
6. AQS 获取lock方法 acquire
这是一个典型的模版模式, 主逻辑定了了(acquire), 次逻辑交由子类 FairSync, NoFairSync 来实现
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire(int)},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking can be used
* to implement method {@link "Lock#lock}
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire(int)} but is otherwise uninterpreted and
* can represent anything you like
*/
/** acquire 是用于获取锁的最常用的模式
* 步骤
* 1\. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
* 2\. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
* 3\. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
* 4\. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)
*
*/
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquireXXX方法是用于获取锁的最常用的模式
步骤:
1. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
2. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
3. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
4. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)
下面我们细分的讲解tryAcquire反正都是交由子类完成, 若这时没有成功, 则将节点入队列
7. AQS 将节点加入到 Sync Queue 里面方法 addWaiter
当Sync为空时直接 添加到 tail, 若不为空, 再调用 enq方法
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
/**
* 将当前的线程封装成 Node 加入到 Sync Queue 里面
*/
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode); // 1\. 封装 Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred != null){ // 2\. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点
node.prev = pred; // 3\. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
if(compareAndSetTail(pred, node)){ // 4\. CAS node 到 tail
pred.next = node; // 5\. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
return node;
}
}
enq(node); // 6\. 队列为空, 调用 enq 入队列
return node;
}
8. AQS 将节点加入到 Sync Queue 里面方法 enq
Sync Queue 是一个支持并发操作, 双向的队列, 并且它里面始终存在一个 dummy 节点(主要是为了节省head, tail的判空, 减少代码复杂度)
/**
* 这个插入会检测head tail 的初始化, 必要的话会初始化一个 dummy 节点, 这个和 ConcurrentLinkedQueue 一样的
* Insert node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor 返回的是前继节点
*/
/**
* 将节点 node 加入队列
* 这里有个注意点
* 情况:
* 1\. 首先 queue是空的
* 2\. 初始化一个 dummy 节点
* 3\. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占)
* 这里为什么要加入一个 dummy 节点呢?
* 这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点
* 而且这里在当前节点想获取lock时通常给前继节点 打上 signal 的标识(表示当前继节点释放lock需要通知我来获取lock)
* 若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础)
*/
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){ // Must initialize // 1\. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样
if(compareAndSetHead(new Node())){ // 2\. 初始化 head 与 tail (这个CAS成功后, head 就有值了, 详情将 Unsafe 操作)
tail = head;
}
}else{
node.prev = t; // 3\. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null, 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
if(compareAndSetTail(t, node)){ // 4\. CAS node 到 tail
t.next = node; // 5\. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
return t;
}
}
}
}
大家若对 dummy 节点感兴趣, 可以先看看 CLH lock的实现
9. AQS 线程获取锁方法 acquireQueued
主逻辑:
-
当当前节点的前继节点是head节点时先 tryAcquire获取一下锁, 成功的话设置新 head, 返回
-
第一步不成功, 检测是否需要sleep, 需要的话就 sleep, 等待前继节点在释放lock时唤醒 或通过中断来唤醒
-
整个过程可能需要blocking nonblocking 几次
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire
*
* @param node the node
* @param arg the acquire argument
* @return {@code} if interrupted while waiting
*/
/**
* 不支持中断的获取锁
*/
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor(); // 1\. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
if(p == head && tryAcquire(arg)){ // 2\. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
setHead(node); // 3\. 获取 lock 成功, 直接设置 新head(原来的head可能就直接被回收)
p.next = null; // help GC // help gc
failed = false;
return interrupted; // 4\. 返回在整个获取的过程中是否被中断过 ; 但这又有什么用呢? 若整个过程中被中断过, 则最后我在 自我中断一下 (selfInterrupt), 因为外面的函数可能需要知道整个过程是否被中断过
}
if(shouldParkAfterFailedAcquire(p, node) && // 5\. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
parkAndCheckInterrupt()){ // 6\. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
interrupted = true;
}
}
}finally {
if(failed){ // 7\. 在整个获取中出错
cancelAcquire(node); // 8\. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
}
}
}
代码中的 tryAcquire 都是交由子类 FairSync, Sync, NonFairSync来完成, 我们下面来看一下shouldParkAfterFailedAcquire, parkAndCheckInterrupt, cancelAcquire方法.
10. AQS 判断线程是否阻塞方法 shouldParkAfterFailedAcquire
本节点在进行 sleep 前一定需要给 前继节点打上 SIGNAL 标识(
因为前继节点在 release lock 时会根据 这个标识决定是否需要唤醒后继节点来获取 lock,
若释放时 标识是0, 则说明 Sync queue 里面没有等待获取lock的线程, 或Sync queue里面的节点正在获取 lock)
一般流程:
-
第一次进入此方法 前继节点状态是 0, 则 CAS 为SIGNAL 返回 false(干嘛返回的是FALSE <- 主要是为了再次 tryAcquire 一下, 说不定就能获取锁呢)
-
第二次进来 前继节点标志为SIGNAL, ok, 标识好了, 这下就可以安心睡觉, 不怕前继节点在释放lock后不进行唤醒我了
/**
* Checks and update status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
/**
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){ // 1\. 判断是否已经给前继节点打上标识SIGNAL, 为前继节点释放 lock 时唤醒自己做准备
/**
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
}
if(ws > 0){ // 2\. 遇到个 CANCELLED 的节点 (ws > 0 只可能是 CANCELLED 的节点, 也就是 获取中被中断, 或超时的节点)
/** // 这里我们帮助删除
* Predecessor was cancelled. Skip over predecessors and
* indicate retry
*/
do{
node.prev = pred = pred.prev; // 3\. 跳过所有 CANCELLED 的节点
}while(pred.waitStatus > 0);
pred.next = node; // 跳过 CANCELLED 节点
}
else{
/**
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 4\. 到这里 ws 只可能是 0 或 PROPAGATE (用于 共享模式的, 所以在共享模式中的提醒前继节点唤醒自己的方式,
// 也是给前继节点打上 SIGNAL标识 见 方法 "doReleaseShared" -> "!compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor)
}
return false;
}
10. AQS parkAndCheckInterrupt 方法
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
/**
* 中断当前线程, 并且返回此次的唤醒是否是通过中断
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName());
return Thread.interrupted(); // Thread.interrupted() 会清除中断标识, 并返上次的中断标识
}
10. AQS 删除取消的节点 parkAndCheckInterrupt 方法
清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面)
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
/**
* 清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面)
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1\. 线程引用清空
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) // 2\. 若前继节点是 CANCELLED 的, 则也一并清除
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; // 3\. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它)
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // 4\. 标识节点需要清除
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // 5\. 若需要清除额节点是尾节点, 则直接 CAS pred为尾节点
compareAndSetNext(pred, predNext, null); // 6\. 删除节点predNext
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || // 7\. 后继节点需要唤醒(但这里的后继节点predNext已经 CANCELLED 了)
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8\. 将 pred 标识为 SIGNAL
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 8\. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 若 pred 是头节点, 则此刻可能有节点刚刚进入 queue ,所以进行一下唤醒
}
node.next = node; // help GC
}
}
至此整个 不响应中断的获取lock过程就 OK 了, 下面我们来看一下, 响应中断的获取锁, 尝试性的获取lock, 带超时的获取lock
11. ReentrantLock 响应中断的获取 lock
此方法与不响应的唯一区别时, 遇到线程中断直接抛出异常, 获取失败
#Reentrant中
/**
* 带中断的获取锁(被其他线程中断后就直接返回)
*/
public void lockInterruptibly() throws InterruptedException{
sync.acquireInterruptibly(1);
}
#AQS 中的
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implement by the first checking interrupt stats, then invoking
* at least once {@link #tryAcquire(int)}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire(int)}
* until success or the thread is interrupted. This method can be
* used to implement method {@link "Lock#lockInterruptibly}
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire(int)} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
/**
* 支持中断的获取 lock ,若被中断 则直接 放弃(aborting)
*/
public final void acquireInterruptibly(int arg) throws InterruptedException {
if(Thread.interrupted()){ // 1\. 判断线程是否被终止
throw new InterruptedException();
}
if(!tryAcquire(arg)){ // 2\. 尝试性的获取锁
doAcquireInterruptibly(arg); // 3\. 获取锁不成功, 直接加入到 Sync Queue 里面(这里的加入操作在doAcquireInterruptibly里面)
}
}
#AQS 中的
/**
* Acquire in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.EXCLUSIVE); // 1\. 将当前的线程封装成 Node 加入到 Sync Queue 里面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2\. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
if(p == head && tryAcquire(arg)){ // 3\. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if(shouldParkAfterFailedAcquire(p, node) && // 4\. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
parkAndCheckInterrupt()){ // 5\. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
throw new InterruptedException(); // 6\. 线程此时唤醒是通过线程中断, 则直接抛异常
}
}
}finally {
if(failed){ // 7\. 在整个获取中出错(比如线程中断)
cancelAcquire(node); // 8\. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
}
}
}
12. ReentrantLock 响应中断及超时的获取 lock
此方法与不响应的唯一区别时, 遇到线程中断/超时直接抛出异常, 获取失败
#Reentrant 中
/**
* 带中断 及 timeout 的获取锁 (线程被中断或获取超时就直接 return )
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException{
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#AQS 中
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(Thread.interrupted()){ // 1\. 判断线程是否被终止
throw new InterruptedException();
}
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); // 2\. 尝试性的获取锁, 获取锁不成功, 直接加入到 Sync Queue 里面(这里的加入操作在doAcquireNanos里面)
}
#AQS 中
/**
* Acquire in exclusive timed mode
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout; // 0\. 计算截至时间
final Node node = addWaiter(Node.EXCLUSIVE); // 1\. 将当前的线程封装成 Node 加入到 Sync Queue 里面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2\. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
if(p == head && tryAcquire(arg)){ // 3\. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // 4\. 计算还剩余的时间
if(nanosTimeout <= 0L){ // 5\. 时间超时, 直接返回
return false;
}
if(shouldParkAfterFailedAcquire(p, node) && // 6\. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
nanosTimeout > spinForTimeoutThreshold){ // 7\. 若没超时, 并且大于spinForTimeoutThreshold, 则线程 sleep(小于spinForTimeoutThreshold, 则直接自旋, 因为效率更高 调用 LockSupport 是需要开销的)
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){ // 8\. 线程此时唤醒是通过线程中断, 则直接抛异常
throw new InterruptedException();
}
}
}finally {
if(failed){ // 9\. 在整个获取中出错(比如线程中断/超时)
cancelAcquire(node); // 10\. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
}
}
}
13. ReentrantLock 释放 lock
#Reentrant 中
/**
* 释放 lock
*/
public void unlock(){
sync.release(1);
}
#AQS 中
public final boolean release(int arg){
if(tryRelease(arg)){ // 1\. 调用子类, 若完全释放好, 则返回true(这里有lock重复获取)
Node h = head;
if(h != null && h.waitStatus != 0){ // 2\. h.waitStatus !=0 其实就是 h.waitStatus < 0 后继节点需要唤醒
unparkSuccessor(h); // 3\. 唤醒后继节点
}
return true;
}
return false;
}
14. ReentrantLock 一般方法
/**
* 创建 Condition
*/
public Condition newCondition(){
return sync.newCondition();
}
/**
* lock 被获取的次数
*/
public int getHoldCount(){
return sync.getHoldCount();
}
/**
* lock是否被当前的线程获取
*/
public boolean isHeldByCurrentThread(){
return sync.isHeldExclusively();
}
/**
* 锁是否被获取
*/
public boolean isLocked(){
return sync.isLocked();
}
/**
* 是否是公平模式
*/
public final boolean isFair(){
return sync instanceof FairSync;
}
/**
* 获取持有lock的线程
*/
protected Thread getOwer(){
return sync.getOwner();
}
/**
* 是否有线程等待获取lock
*/
public final boolean hasQueuedThreads(){
return sync.hasQueuedThreads();
}
/**
* 当前的线程是否在 AQS Sync Queue 里面等待获取lock
*/
public final boolean hasQueuedThread(Thread thread){
return sync.isQueued(thread);
}
/**
* AQS Sync Queue 里面等待获取锁的线程的长度
*/
public final int getQueueLength(){
return sync.getQueueLength();
}
/**
* AQS Sync Queue 里面等待获取锁的线程
*/
protected Collection<Thread> getQueuedThreads(){
return sync.getQueuedThreads();
}
/**
* 是否有线程在 Condition Queue 里面等待获取锁
*/
public boolean hasWaiters(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException(" not owber ");
}
return sync.hasWaiters((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
/**
* Condition Queue 里面等待获取锁的长度
*/
public int getWaitQueueLength(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException("not owner");
}
return sync.getWaitQueueLength((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
/**
* Condition Queue 里面等待获取锁的线程
*/
protected Collection<Thread> getWaitingThreads(Condition condition){
if(condition == null){
throw new NullPointerException();
}
if(!(condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){
throw new IllegalArgumentException("not owner");
}
return sync.getWaitingThreads((KAbstractQueuedSynchronizer.ConditionObject)condition);
}
网友评论