我们经常使用Reentranklock来进行加锁,Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
理解AbstractQueuedSynchronizer的源码将让大家对锁的认识更加深刻。Reentranklock有公平锁和非公平锁的实现,同时还实现了锁的可重入,锁的性能比较稳定,在并发量很高的场景中,相对于synchronized锁的实现,效率会更高,因为synchronized在升级成重量锁后会让系统开销很大,废话不多说啦,直接进入源码分析吧。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从源码可以看出来,Reentrantlock的同步默认使用的非公平锁,如果需要公平锁,可以自己设置。
然后我们开始进入trylock方法的源码。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//c为获取到锁的状态,无锁状态下state为0
if (c == 0) {
//通过CAS获取锁,然后通过将锁设置为当前锁持有。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果持有锁的线程为当前线程,那么将state的值加1,实现了重入的功能。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
//如果nextc < 0 这时说明超出了限制
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置state状态,同时返回true。
return true;
}
return false; // 没有获取到,返回false。
}
同时看下非公平锁下的释放锁操作:
protected final boolean tryRelease(int releases) {
// 将state的值减1,
int c = getState() - releases;
//判断是否是当前锁持有线程,如果不是,就跑出异常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果这个时候的state正好为 0,说明此时锁的没有被任何线程所占有。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); //设置占有所得为空。
}
setState(c); //设置state的状态。
return free; //标记释放锁是否成功。
}
然后我们看下公平锁下线程是如何通过tryAcquire获取到锁的。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//同样回会去判断锁的状态,如果为0,那么就是当前没有线程持有锁
if (c == 0) {
//主要看hasQueuedPredecessors方法,同时cas一下就行。这里维护了一个clh队列,需要排队来获取锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); //设置持有锁的线程为当前线程。
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果持有锁的线程为当前线程。
int nextc = c + acquires; //这个时候会实现可重入。
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//如果满足下面。如果(s = h.next) == null 满足的话,那么就是head还没有指向tail,这个时候
//正好有别的线程在入队列,那么肯定没有队列来这里。这个时候CLH队列中是存在元素的,
//而不是没有元素。(s = h.next) !=null 的话,说明里面至少有一个有效节点,
//并且s.thread == Thread.currentThread()也满足的话,说明等待的节点的线程和现有的线程是相同的。
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
同时可以看一下添加了时间参数的trylock是如何实现的。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
//
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 超出时间
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//如果为头节点,然后后去到了
if (p == head && tryAcquire(arg)) {
setHead(node); //将head设置为当前节点
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 超出时间
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//放入队列中
private Node addWaiter(Node mode) {
//
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果队列不为空
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果为空,那么新建一个队列。
enq(node);
return node;
}
然后我们看看enq的实现。
补充说明下队列中waitstatus通过枚举有几种不同的状态,下文中会用到waitstatus的状态。
Node的值分别为0(初始化后默认的状态),cancelled 为1表示线程获取锁的请求取消了,conditon为 -2 表示在队列中等待,等待着唤醒,singal为 -1,表示线程准备好了,等待着资源的释放。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//看是否获取到
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //标记是否拿到资源
try {
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断。
for (;;) {
// 获取到当前节点的头驱节点。
final Node p = node.predecessor();
//如果当前节点的头驱节点是头节点,并且当前节点获取到锁
if (p == head && tryAcquire(arg)) {
setHead(node); //设置该节点为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//失败获取到,两种情况,该节点的前驱节点是头节点,但是被在不公平的环境下被
//其他节点获取到了,或者就不是头节点。
//parkAndCheckInterrupt 用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) //失败的话,停止获取
cancelAcquire(node);
}
}
//
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//查看前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //signal -1 线程准备好了,等待资源的释放。
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 如果大于0,线程获取锁的请求被取消
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 循环向前查找取消节点,把取消节点给删除掉。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //这个时候为0 或者propagate (这个是处于shard下使用的)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上文中finally中的cancelAcquire又是什么操作呢?我们去源码看一下。
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
上文中分析了加锁的过程,那么我们接下来看下解锁是如何实现的。同样,进入到unlock方法的实现中。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//通过tryRelease查看是否被释放
if (tryRelease(arg)) {
Node h = head;
// 如果获取到节点状态,如果h!=null,说明有后驱节点。
// 如果waitstatus的状态为0,那么后驱节点可能在运行中,不需要唤醒。
//如果为小于0,那么后继节点被阻塞啦,需要被唤醒了。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//在reentranklock的源码中有关于释放的,这个和前文中的tryAcquire很像。
protected final boolean tryRelease(int releases) {
//state的状态值
int c = getState() - releases;
//如果不是当前线程, 那么抛出异常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//记录状态。
boolean free = false;
//如果被状态值被释放完。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); //设置所属的线程为空。
}
setState(c); //设置状态。
return free; //返回是否释放成功的标记。
}
还有一个很大的疑惑为啥都是从后面像前面找节点然后进行条件剔除呢,而不是从前到后遍历?
一个原因是tail入队时,还没有等到pre.next = node执行,这个时候就需要从后向前找值啦。
还有一个原因是在产生canne节点时,会先断开next,再断开pre。
上文主要说了reentranklock的主要方法,还有些方法以后接着更新。
其中的ReentrankLock中使用的AQS应用广泛,需要我们了解其实现原理。
参考资料:
1.ReentrankLock源码
2.https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
3.Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
网友评论