前言
心血来潮想到要做一些源码分析,想想从ReentrantLock类开始挺合适的,就尝试着先写一篇吧,废话不多说进入正题。
正文
说到ReentrantLock类首先肯定得看一下内部类Sync,其定义如下(此处仅保留了一些重要部分,后面的源码片段也会如此操作)
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
}
除了Sync还有两个继承了Sync的内部类,分别是NonfairSync和FairSync,分别对应了非公平锁和公平锁
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
两者的差别就是lock()方法和tryAcquire(int)方法的不同,其中lock()方法是Sync类定义的,而tryAcquire(int)方法是AbstractQueuedSynchronizer类定义的,这个类就是锁类ReentrantLock和ReentrantReadWriteLock的核心,被称之为AQS的类,晚些再讲解。这里顺带提一下,这种父类中定义一个抽象方法留待不同子类去具体实现的模式就是设计模式中的模板模式。继续讲源码,先来看一下ReentrantLock的lock()方法,可以看出实际的实现是由内部类Sync的实现类来处理,具体是FairSync还是NonfairSync的,就要看初始化时候的传参了。
public void lock() {
sync.lock();
}
对比公平和非公平锁的lock()实现,acquire(1)这步是相同的,不同的地方在于非公平锁要进行首次抢占,优先执行compareAndSetState(0, 1)。这个方法是属于AQS的,实现如下:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这里就碰到了非常经典的CAS方法,简单来说就是设置一个新值之前要判断原值与期望值是否相同,相同才会设置新值,并且会自旋重试,直到新值设置成功为止,CAS厉害在这一系列的动作都是原子的,避免了并发问题。同时unsafe对象是Unsafe类的实例,该类就是jdk中专门用来调用这些封装好的原子操作的。
此处CAS方法是第一次调用lock()方法时才会得到true,进而执行setExclusiveOwnerThread(Thread.currentThread())方法,其它情况下都是执行acquire(1)方法去尝试获取锁。setExclusiveOwnerThread(Thread.currentThread())方法即是将当前线程设置到exclusiveOwnerThread属性上,即表示当前线程获得了锁。
回过头来看acquire(1)方法,该方法属于AQS,具体实现如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看出首当其冲就是调了tryAcquire方法,之前刚好提过,该方法的具体实现是交由Sync的子类的。这里非公平锁也是很有趣,实现tryAcquire方法就是又回过去调用了Sync的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
将这个方法和公平锁的tryAcquire对比一下发现,相似度99%。唯一不同的地方只有公平锁中多了!hasQueuedPredecessors()这个判断条件
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
说到hasQueuedPredecessors()这个方法,就不得不讲AQS的设计结构以及其中的CLH队列,这里可以有一个简单的概念就是AQS中维护了一个存放线程的链表,为了不扩展的太远,所以先来分析非公平锁的流程。
首先根据getState()获得当前线程的状态c,可以看到c为0时可以获得锁,这与之前提到的非公平锁的第一步操作又是相似度极高。当c不为0时,执行current == getExclusiveOwnerThread()判断,为true说明当前线程是占用着该锁的线程,对c进行累加操作直到c溢出时抛异常。此时跳回AQS的acquire方法,当tryAcquire(1)的结果为false就有必要进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的判断了,接着就来看看这个方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里入参需要一个node,而方法调用时填的是addWaiter(Node.EXCLUSIVE),这里的EXCLUSIVE指的是独占的意思,意味着这是一个独占锁的操作,只有一个线程可以占有锁,与之相对的还有一种是SHARED,即共享锁,ReentrantReadWriteLock就是一种共享锁,这个以后有机会再展开。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
接着看addWaiter()方法的实现,这个方法概括起来就是将新生成的Node.EXCLUSIVE节点放到原来的队尾节点的后面变成新的队尾节点。这里用到了compareAndSetTail(pred, node)这个方法,看一下实现
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
又看到了熟悉的unsafe的CAS操作,很明显就是为了防止并发的情况。
继续拉回来看acquireQueued()方法,这里的主体是一个for循环,还是一个死循环,唯一跳出循环的可能是return interrupted这里,一看这条件我真的是倒吸一口凉气,里面递归调用了tryAcquire()做为判断条件,说实话这种操作不是对全局有很强的把控的话真的不敢随便乱用。回过头去看一眼tryAcquire()方法为true的条件是什么,当然就是当前线程必须要获得锁。而p == head就是判断p这个节点是不是头节点,那么这个p又是个,来看p = node.predecessor()的代码
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
一看这方法其实就是返回上一个节点,p == head判断的就是上一个节点是否是头结点。连起来看就是当上一节点是头结点并且能获得锁的情况下结束循环返回false,如下的整个acquire()方法就结束了,整个lock()过程也就结束了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
跳回for循环中,如果不能触发return又是一直在做什么呢
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
首先看一下shouldParkAfterFailedAcquire(p, node)在干什么
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可以看到这个方法返回值是boolean类型的,根据描述当线程需要阻塞时返回true。其中的判断条件只有一个,就是pred.waitStatus,即前一个节点的waitStatus状态。
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
根据常量的定义和属性的描述可以看出,waitStatus的取值有0,1,-1,-2,-3这几种情况,正常情况下初始值为0,并且这是一个volatile类型的属性,为了保证可见性更新这个值需要CAS操作。然后看判断条件,这里分成了三种情况,第一种状态为SIGNAL,即等待状态,线程需阻塞;第二种waitStatus>0,其实也只有CANCELLED这一种情况,这时做的事情是递归的将当前节点连接到前一个节点的上一个节点,也就是从这条链上去除掉所有CANCELLED状态的节点;第三种也就是剩下的状态,调用compareAndSetWaitStatus方法,来看一下其实现
private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}
又见到了unsafe的CAS操作,那么显而易见第三种情况下的操作就是将上一个节点状态置为SIGNAL。
这里可以总结一下shouldParkAfterFailedAcquire的作用了,那就是根据上一个节点的情况来做相应的操作。接着就要看parkAndCheckInterrupt()方法了。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
关于这个LockSupport类,这里不准备再深入了,里面又是调用到了unsafe进行一系列较为底层的操作,见但介绍一下LockSupport这个类,它是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞,暂时了解到这一层就够了。
看上去流程基本是讲完了,该总结一下非公平锁和公平锁的差异了,等等,好像之前hasQueuedPredecessors()方法还晾在一边没讲。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
总结一下,这个方法做的事情是判断等待队列中有没有节点。都到这里了,是时候引出没讲的AQS了。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
public abstract class AbstractOwnableSynchronizer{
private transient Thread exclusiveOwnerThread;
}
我把所有的方法和细枝末节的东西都删了,呈现出来类结构是这样的,AQS连同它继承的AOS类一共就只有这些属性,重点是exclusiveOwnerThread属性,这个属性就是持有锁的线程,也就是真正的boss。整个锁其实本质就是围绕两个点,线程和队列,内部类NODE有一个注释写的非常好,有必要放一下。
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
这个注释真的是高,一下子就展示出了CLH队列的含义,这个CLH队列中每一个节点就是一个NODE对象,NODE对象上记录了对应的排队获取锁的线程,状态用waitStatus表示,规则是tail入队head出队,当然这里的入队和出队操作都是基于CAS封装的原子操作。分析到这里我们就可以总结出整个锁的核心原理了,==那就是通过CLH队列将多线程争抢锁的并行过程变成队列中排队获取锁的串行过程==。
最后用一张网上看到的画得非常好的流程图来区别一下公平锁和非公平锁的区别,区别有两点:==一、非公平锁多了首次抢占;二、公平锁在基于CLH队列获取锁的过程中还要判断队列中是否有等待节点==。这两点也很好的展示了两种锁模式的特性。
NonfairSync
lock —> compareAndSetState
| —> setExclusiveOwnerThread
—> accquire
| —> tryAcquire
|—>nonfairTryAcquire
|—> acquireQueued
FairSync
lock —> acquire
| —> tryAcquire
|—>!hasQueuePredecessors
|—>compareAndSetState
|—>setExclusiveOwnerThread
|—> acquireQueued
总结
本文主要是对ReentrantLock中重点源码的一个梳理和总结,讲解了获取锁的核心原理,并且对比了公平锁和非公平锁的区别。后续考虑逐步把concurrent包里的几个常用的重要类都讲一下,但是写一篇分析真的太耗时间了,我自己也不确定能写几篇,走着看吧。
网友评论