1 简言
ReentrantLock:重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁。
ReentrantLock实现了Lock接口,Lock接口定义的方法:
// 获取锁
void lock();
// 获取锁(可中断)
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,如果没获取到锁,就返回false
boolean tryLock();
//尝试获取锁,如果没获取到锁,就等待一段时间,这段时间没有获取到锁就返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//条件锁
Condition newCondition();
2 源码分析
2.1 3个内部类
- Sync:实现了AQS的部分方法
- NonfairSync:继承Sync,主要用于非公平锁的获取
- FairSync:继承Sync,主要用于公平锁的获取
2.2 构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默认构造方法就是采取的非公平锁,
另一个构造传入boolean型参数,true则为公平锁,false则为非公平锁
2.3 lock()方法
2.3.1 公平锁
调用FairSync类的lock()方法
// FairSync的lock方法
final void lock() {
acquire(1);
}
// 调用AbstractQueuedSynchronizer的acquire方法
public final void acquire(int arg) {
// 先尝试获取锁
// 获取锁失败后,就去排队,这里采用的节点模式是排他模式
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
FairSync.tryAcquire()方法,尝试获取锁
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取状态变量
int c = getState();
//状态变量为0,可以获得锁
if (c == 0) {
// 如果没有其他线程在排队,那么当前线程尝试更新state的值为1
// 如果成功了,则说明当前线程获取了锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 当前线程获取了锁,把自己设置到exclusiveOwnerThread变量中
// exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
setExclusiveOwnerThread(current);
// 返回true说明成功获取了锁
return true;
}
}
// 如果获得锁线程是当前线程,则加1
// 当前线程本身就占着锁,现在又尝试获取锁,直接让它获取锁并返回true
else if (current == getExclusiveOwnerThread()) {
// 状态变量state的值加1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 这里不需要CAS更新state,当前线程占着锁,其它线程只会CAS把state从0更新成1,是不会成功的
setState(nextc);
return true;
}
//获取不到线程
return false;
}
AbstractQueuedSynchronizer的addWaiter()方法,调用这个方法说明上面尝试获取锁失败了,尝试创建新节点并加到尾结点后面
private Node addWaiter(Node mode) {
// 以给定模式创建一个新节点
Node node = new Node(Thread.currentThread(), mode);
// 这里尝试先把新节点加到尾结点后面
// 成功了就返回新节点
// 没成功再调用enq()方法
Node pred = tail;
//如果尾结点不为空
if (pred != null) {
// 设置新节点的前置节点为现在的尾节点
node.prev = pred;
// CAS更新尾节点为新节点
if (compareAndSetTail(pred, node)) {
// 如果成功了,把旧尾节点的下一个节点指向新节点
pred.next = node;
return node;
}
}
// // 如果上面尝试入队新节点没成功,调用enq()处理
enq(node);
return node;
}
AbstractQueuedSynchronizer的enq()方法,自旋,不断尝试加入的等待队列
private Node enq(final Node node) {
// 自旋,不断尝试
for (;;) {
Node t = tail;
// 如果尾节点为空,说明还未初始化
if (t == null) { // Must initialize
// 初始化头节点和尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果尾节点不为空
// 设置新节点的前一个节点为现在的尾节点
node.prev = t;
// CAS更新尾节点为新节点
if (compareAndSetTail(t, node)) {
// 成功了,则设置旧尾节点的下一个节点为新节点
t.next = node;
return t;
}
}
}
}
addWaiter()方法,使得新节点已经成功入队了,
acquireQueued()方法,尝试让当前节点获取锁
final boolean acquireQueued(final Node node, int arg) {
// 失败标记
boolean failed = true;
try {
// 中断标记
boolean interrupted = false;
// 自旋
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
// 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
// 调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 尝试获取锁成功
// 这里同时只会有一个线程在执行,所以不需要用CAS更新
// 把当前节点设置为新的头节点
setHead(node);
// 并把上一个节点从链表中删除
p.next = null; // help GC
failed = false;
return interrupted;
}
// 是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 真正阻塞的方法
parkAndCheckInterrupt())
// 如果中断了
interrupted = true;
}
} finally {
if (failed)
// 取消获取锁
cancelAcquire(node);
}
}
当前节点获取锁失败,需要阻塞,调用shouldParkAfterFailedAcquire方法,
这个方法是在上面的for()循环里面调用的,第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false,第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 上一个节点的等待状态
// 注意Node的waitStatus字段我们在上面创建Node的时候并没有指定
// 也就是说使用的是默认值0
// static final int CANCELLED = 1;
// static final int SIGNAL = -1;
// static final int CONDITION = -2;
// static final int PROPAGATE = -3;
int ws = pred.waitStatus;
// 如果等待状态为Singal(等待唤醒),直接返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果前一个节点的状态大于0,也就是已取消状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 把前面所有取消状态的节点都从链表中删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
// 这里可以简单地理解为把初始状态0设置为SIGNAL
// CONDITION是条件锁的时候使用的,PROPAGATE是共享锁使用的
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
真正执行阻塞的方法,调用parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
// 底层调用的是Unsafe的park()方法
LockSupport.park(this);
// 返回是否已中断
return Thread.interrupted();
}
总结:调用关系如下
大致过程:
(1)尝试获取锁 ,如果获取到了就直接返回了;
(2)尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;
(3)然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;
(4)如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);
(5)调用parkAndCheckInterrupt()阻塞当前线程;
(6)如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;
(7)如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。
2.3.2 非公平锁
ReentrantLock.lock()方法
public void lock() {
sync.lock();
}
调用NofairSync的lock方法,这个方法在公平锁模式下直接调用acquire(1);
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
接着调用父类NofairSync的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果状态变量的值为0,再次尝试CAS更新状态变量的值
// 相对于公平锁模式少了 !hasQueuedPredecessors()条件
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
相对于公平锁,非公平加锁的过程有几点不同:
(1)一开始就尝试CAS更新状态变量state,如果成功了就获取到锁了
(2)在tryAcquire()的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队呢;
总结:
总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。
2.4 lockInterruptibly()方法
支持线程中断,其与lock()的方法的主要区别在于lockInterruptibly()获取锁的时候,如果线程中断了,会抛出一个异常,而lock()不会去管线程是否中断都会一直尝试获取锁,获取锁之后把自己标记为已中断,继续执行自己的逻辑,后面也会正常释放锁。
注意:线程中断,只是在线程上打一个中断标志,并不会对运行中的线程有什么影响,具体需要根据这个中断标志干些什么,用户自己去决定;
比如,如果用户在调用lock()获取锁后,发现线程中断了,就直接返回了,而导致没有释放锁,这也是允许的,但是会导致这个锁一直得不到释放,就出现了死锁。
lock.lock();
if(Thread.currentThread().interrupted()){
return;
}
lock.unlock();
2.5 tryLock()方法
尝试获取一次锁,成功了就返回true,没成功就返回false,不会继续尝试。
public boolean tryLock() {
// 直接调用Sync的nonfairTryAcquire()方法
return sync.nonfairTryAcquire(1);
}
tryLock()方法比较简单,直接以非公平的模式去尝试获取一次锁,获取到了或者锁本来就是当前线程占有着就返回true,否则返回false。
2.6 tryLock(long timeout, TimeUnit unit)方法
尝试获取锁,并等待一段时间,如果在这段时间内都没有获取到锁,就返回false。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
调用AbstractQueuedSynchronizer的tryAcquireNanos方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果线程中断了,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取一次锁
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
调用AbstractQueuedSynchronizer的doAcquireNanos方法
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果时间已经到期了,直接返回false
if (nanosTimeout <= 0L)
return false;
// 到期时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果到期了,直接返回false
if (nanosTimeout <= 0L)
return false;
// spinForTimeoutThreshold = 1000L;
// 只有到期时间大于1000纳秒,才阻塞
// 小于等于1000纳秒,直接自旋解决就得了
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryLock(long time, TimeUnit unit)方法在阻塞的时候加上阻塞时间,并且会随时检查是否到期,只要到期了没获取到锁就返回false
2.7 unlock()方法
public void unlock() {
sync.release(1);
}
调用 AbstractQueuedSynchronizer.release
public final boolean release(int arg) {
// 调用AQS实现类的tryRelease()方法释放锁
if (tryRelease(arg)) {
Node h = head;
// 如果头节点不为空,且等待状态不是0,就唤醒下一个节点
// 在每个节点阻塞之前会把其上一个节点的等待状态设为SIGNAL(-1)
// 所以,SIGNAL的准确理解应该是唤醒下一个等待的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock的公平锁的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程不是占有着锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果状态变量的值为0了,说明完全释放了锁
// 这也就是为什么重入锁调用了多少次lock()就要调用多少次unlock()的原因
// 如果不这样做,会导致锁不会完全释放,别的线程永远无法获取到锁
if (c == 0) {
free = true;
// 清空占有线程
setExclusiveOwnerThread(null);
}
// 设置状态变量的值
setState(c);
return free;
}
unparkSuccessor方法,这里的node是头节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 如果头节点的等待状态小于0,就把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 头节点的下一个节点
Node s = node.next;
// 如果下一个节点为空,或者其等待状态大于0(实际为已取消)
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果下一个节点不为空,则唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}
释放锁的过程:
(1)将state的值减1
(2)如果state减到了0,说明已经完全释放锁了,唤醒下一个等待着的节点;
网友评论