之前写了篇文章介绍了synchronized的原理《Java synchronized 原理从开始到放弃》,而本篇是介绍另外一个高频出现在并发场景的类ReentrantLock,有必要深入理解他的使用和原理。如果你不想看源码的话,直接看图示 + 总结就能摸清整体的流程了。
使用
ReentrantLock使用起来很简单,例如:
ReentrantLock lock = new ReenTrantLock;
...
try {
lock.lock(); // 加锁
//do something();
} finally {
lock.unlock(); // 解锁
}
ReentrantLock 使用起来很简单,甚至比synchronized更直观,直接针对当前线程上锁、解锁,而且还有更多的功能:
- lock() 获得锁就返回true,不能的话一直等待获得锁
- tryLock() 尝试获取锁成功就返回true,不能就直接返回false,不会等待
- tryLock(long timeout,TimeUnit unit) 跟tryLock一样,尝试获取锁,如果超过该时间段还没获得锁,返回false
- lockInterruptibly 获取锁,跟lock不一样的地方是,过程中会检测是否中断(interrupt),若是会抛出异常
- 构造函数
public ReentrantLock(boolean fair)
可设置公平锁还是非公平锁
这里不再细说ReentrantLock的使用方法。
特性
- 可重入锁
- 可响应中断
- 可尝试加锁
- 限时等待尝试加锁
- 公平锁、非公平锁
- 与Condition信号量结合使用
原理
看下ReentrantLock的源码,其中有个重要的变量sync
,是一个继承AbstractQueuedSynchronizer (以下称为AQS) 的Sync抽象类,分别由FairSync、NonfairSync类实现,代表着公平和非公平策略。
可见,ReentrantLock的实现基本依靠AQS实现的,所以如果要理解ReentranLock,有必要学习AQS。
AQS(AbstractQueuedSynchronizer)
AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息。队列的头部表示该Node对应的线程已经在执行了,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
在这里插入图片描述接着上面的示意图,来看一下Node结点类:
在这里插入图片描述AQS 有两种模式,一种是独占模式 SHARED ,另外一种是共享模式 EXCLUSIVE;ReentrantLock是独占模式。
SHARED 共享模式,表示只有一个线程能执行
EXCLUSIVE 独占模式,表示可以多个线程同时执行
CANCELLED:1 取消状态,表示这个结点被取消了,可能是被主动取消或者超时,后续这个结点会被踢出队列
SIGNAL: -1 通知后继结点,表示这个结点执行完成以后,需要通知唤醒后继的结点
CONDITION:-2 说明这个结点因为被某一个condition挂起了
PROPAGATE:-3 在共享模式下,下一次获取锁后可以无限传播(不太懂这个意思,后续再好好理解)
除了以上常量,再看下几个变量
waitStatus 当前结点的状态,默认是0,可以是CANCELL、SIGNAL 等
prev 前继结点
next 后继结点
thread 对应的线程
nextWaiter 下一个等待condition的结点
state 状态;是一个int值,表示当前线程占用资源的数量;0表示空闲,没有线程占用;ReentrantLock的state表示线程重入锁的次数
在AQS里面方法分成两类:
- acquire()、acquireInterruptibly()、release() 不以share结尾的方法名;
- acquireShared()、acquireSharedInterruptibly()、releaseShared() 以share结尾的方法名。
顾名思义, 不以share结尾的方法用于独占模式;以share结尾的方法用于共享模式。
有了以上的了解,可以开始分析源码了
ReentrantLock 的 Sync 分公平策略FairSync 和非公平策略 NonfairSync两个实现方式:
1.lock()
ReentrantLock 的 lock 在FairSync和NonfairSync的方式有一些不同
//ReentrantLock
public void lock() {
sync.lock();
}
以下分成两种NonfairSync.lock 和 FairSync.lock:
(1) NonfairSync.lock
// NonfairSync
final void lock() {
if (compareAndSetState(0, 1)) // 锁状态空闲时,尝试直接更新
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前线程为独占锁的拥有者
else // 若失败,走正常流程获取锁
acquire(1);
}
}
compareAndSetState()
就是CAS尝试设置state字段,在ReentrantLock中state字段表示当前线程重入锁的次数,当state为0时候,表示锁是空闲的。compareAndSetState(0, 1)
表示当state是0即空闲时候,直接更新为1,直接CAS上锁;
NonfairSync中,当前线程如果成功了,setExclusiveOwnerThread()
设置当前线程为锁的占有者;
如果失败了,走acquire()
获取锁。
// AQS.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其实AQS中已经实现了FIFO队列的各种操作,子类只需要重写tryAcquire()
和tryRelease()
两个方法;
tryAcquire()
尝试获取锁,在NonfairSync中实现,失败就addWaiter()
新生成一个wait结点,并加入等待队列;
acquireQueued()
表示在加入队列以后,阻塞挂起,被唤醒后再次获取资源,获取失败再次阻塞;
要是失败了就只能selfInterrupt()
中断了。
// NonfairSync
protected final boolean tryAcquire(int acquires) {
// 直接调用AQS的nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
// AQS.java
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // state == 0;锁空闲,尝试cas直接更新state
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// state != 0 但是当前线程 等于 锁占用的线程,表示当前线程可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 更新state
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// AQS.java
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {// 死循环
Node oldTail = tail;
if (oldTail != null) {// 队列不为空
// CAS加入队列尾部
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {// 队列为空,初始化队列
initializeSyncQueue();
}
}
}
// AQS.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前继结点p
final Node p = node.predecessor();
// 如果前继结点是头节点,尝试加锁tryAcquire
if (p == head && tryAcquire(arg)) { // 加锁成功
setHead(node); // 设置自己为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 做一些阻塞前的准备工作
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞当前线程
// 并唤醒后会检查在阻塞过程是否发生了中断
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AQS.java
// pred 表示前继结点,node 表示当前结点
// 主要做两个工作:
// 1. 将前继结点状态设置为SIGNAL,保证前继结点执行完后会唤醒通知后继结点
// 2. 清除掉前面CANCEL状态的结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // 清除Cancel状态的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前继结点的staus设置为SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
NonFairSync的原则就是:不要脸,在各种地方尝试不排队直接CAS去拿锁
(2) FairSync.lock
再来看下 FairSync的lock()
实现
// FairSync
final void lock() {
acquire(1);
}
// AQS.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
FairSync里的lock不会先尝试去直接更新,而是循规蹈矩地排队获取资源,调用了acquire()
,流程基本一样的只是在tryAcquire()
实现上不一样
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 锁空闲
// 确认没有前继结点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// CAS修改state,上锁
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 当前线程 == 占有线程,表示可重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
FairSync为了表示公平策略,奉行两个原则:
- 不直接抢占锁资源
- 乖乖排队,确认前面没有其他线程再获取锁资源
小结:
在这里插入图片描述 在这里插入图片描述2.tryLock()
// ReentrantLock
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
可见tryLock()
直接尝试nonfairTryAcquire()
,一步到位,能成功就成功,失败就不做任务入队处理。
3. lockInterruptibly()
可中断的lock,如果过程中发生中断会抛出异常
// ReentrantLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// AQS
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 如果当前线程中断
throw new InterruptedException(); // 抛出异常
if (!tryAcquire(arg))
doAcquireInterruptibly(arg); // 跟acquireQueued一样流程,只是过程中会抛出中断异常
}
4. tryLock(long timeout, TimeUnit unit)
在限定时间内尝试加锁,在没看代码之前,我猜测应该是通过CAS + 无限循环这套黄金组合,尝试加锁,如果超过了时间就返回失败。借着这个猜想来看下源码:
// ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// AQS
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // 还判断了下是否中断了
throw new InterruptedException();
return tryAcquire(arg) || // 尝试获取资源,否则进入doAcquireNanos
doAcquireNanos(arg, nanosTimeout);
}
// AQS
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 算出超时的时刻
final Node node = addWaiter(Node.EXCLUSIVE);// 加入FIFO队列
try {
for (;;) {// 死循环
final Node p = node.predecessor(); // 前继结点
if (p == head && tryAcquire(arg)) { // 前继结点是首节点,尝试去获取资源
setHead(node); // 成功了
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 时间差超过要自旋的限定值,不自旋了,直接进入阻塞挂起本线程,nanosTimeout时间后再唤醒
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) // 如果在阻塞或者自旋的过程中发生中断,抛出异常
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
可见,加入等待队列之后,作者不只是用持续自旋尝试去获取资源的方式,而且如果发现需要等待的时间太久了,直接调用LockSupport.parkNanos(this, nanosTimeout)
阻塞等待一段时间后再自动唤醒,然后再尝试获取资源,若失败了直接返回失败。
这里有个疑惑,如果前面的结点执行完毕了,提前唤醒了这个限时间的线程,没错,它还是会走完整个流程。
5. unlock()
释放锁资源
// ReentrantLock
public void unlock() {
sync.release(1);
}
// AQS.java
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;// 当前执行的结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
上面说过了,AQS基本实现了所有的逻辑,实现类基本只需要实现tryAcquire()
和 tryRelease()
,这里Sync实现了tryRelease()
,所以FairSync和NonFairSync释放资源的方式是一样的。
// Sync
protected final boolean tryRelease(int releases) {
// 减去重入的数量,如果c == 0,说明释放锁了
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 释放锁了
free = true;
// 设置当前独占锁的拥有者为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
当前的head结点指向的是在执行的线程,如果后面还有其他结点需要唤醒,那么head的status应该是SINGAL,继续走到unparkSuccessor()
再看一下unparkSuccessor()
怎么唤醒后续的结点起来工作
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);// 修改状态成0
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 下一个为null或者CANCEL
s = null;
// 从尾部往前遍历,去除CANCEL的线程和找出下一个唤醒的线程
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);// 唤醒下一个线程
}
结合上面的lock流程中可以看到,线程是被阻塞在acquireQueued()
方法中,而现在被唤醒的线程就会继续在acquireQueued()
执行。
小结
unlock() 流程基本上是一致的,释放当前占用的资源,如果state == 0 就会释放锁,并且唤醒队列中下一个线程。
疑惑
在上面也许你看不到哪里阻塞的当前的线程,如果你看源码的话就会知道AQS借助LockSupport的park()和unPark()方法来阻塞、唤醒当前线程。其实一直有个疑问,不止是AQS还有ConCurrentHashMap都大量使用了CAS + 轮询的方式实现无锁,那么CAS + 轮询 黄金组合能不能代替真正的锁呢?
后来想想还是不可以的,这里先说两个概念,乐观锁和悲观锁,乐观锁表示别人大概率是不会跟我抢锁的,而悲观锁觉得肯定会有人跟我抢资源的。CAS就是实现乐观锁的手段。
通过 无限制的轮询,借助CAS去尝试获取资源,如果在短时间内获取资源成功了,那代价肯定比重量锁的代价小,相反,如果资源被其他线程长期持有着,那么无限的轮询就是在长时间内白白耗尽CPU资源,这时候还不如直接重量锁,直接阻塞挂起线程,等待唤醒。
可见,CAS + 无限轮询更适合一段时间肯定能获取资源的场景。从上面的doAcquireNanos()
就看出来,作者结合了CAS和阻塞的方式。
不足
以上只是讲了ReentrantLock和AQS常用的lock()、tryLock()等流程,关于共享模式、Condition内容没有讲诉,后面有时间再慢慢研究这部分。
网友评论