上一篇文章对ReentrantLock锁进行了概述,相信看完了的话应该对ReentrantLock锁的使用有了一定的了解,这篇文章分析下ReentrantLock锁的实现机制。
首先需要了解ReentrantLock类里面有三个静态类:Sync
、NonfairSync
和FairSync
,ReentrantLock的锁内部实现通过NonfairSync和FairSync实现,而NonfairSync和FairSync都是Sync的子类。类图如下所示:
AbstractQueuedSynchronizer
,类如其名,抽象的队列式的同步器,通常被称之为AQS类,它是一个非常有用的父类,可用来定义锁以及依赖于排队park
线程(这里的park你可能不明白什么意思,但是你肯定知道阻塞的意思,其实park类似阻塞,很多文章直接就说是阻塞,但是我感觉两者还是不一样的,park可以看做是一种轻量级的阻塞,至少我是这样理解的,欢迎指正,unpark类似唤醒线程但是比唤醒线程高效)的其他同步器。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。AQS框架是整个JUC锁的核心部分,提供了以下内容与功能:
- Node 节点, Sync Queue和Condition Queue的存放的元素,用来保存park线程, 这些节点主要的区分在于 waitStatus 的值
- Condition Queue, 这个队列是用于独占模式中, 只有用到 Condition.awaitXX 时才会将 node加到 tail 上(PS: 在使用 Condition的前提是已经获取 Lock)
- Sync Queue, 独占和共享的模式中均会使用到的存放 Node 的 CLH queue(主要特点是, 队列中总有一个 dummy 节点, 后继节点获取锁的条件由前继节点决定, 前继节点在释放 lock 时会唤醒sleep中的后继节点),CLH队列的CLH全称是Craig, Landin, and Hagersten
- ConditionObject, 用于独占的模式, 主要是线程释放lock, 加入 Condition Queue, 并进行相应的 signal 操作
- 独占的获取lock (acquire, release), 例如 ReentrantLock 就是使用这种,
- 共享的获取lock (acquireShared, releaseShared), 例如 ReentrantReadWriteLock、Semaphore、CountDownLatch
所以,理解了AbstractQueuedSynchronizer 机制就理解了ReentrantLock锁机制。
AbstractQueuedSynchronizer
继承自AbstractOwnableSynchronizer
:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
/**所有的变量的都是transient的,为啥还要提供序列化ID呢?*/
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// The current owner of exclusive mode synchronization.
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer
类名能够很清晰的表达它的意思:独有线程同步器,提供了线程独占的两个方法setExclusiveOwnerThread(Thread thread)
和getExclusiveOwnerThread()
内部类 Node
AQS有一个很重要的内部类Node,这个类会装载线程,看下它的源码:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {//Returns true if node is waiting in shared mode.
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {//
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
可以看到,Node类里首先定义了6个static final的变量,用来区分是共享锁还是独占锁以及等待线程的五种状态(1、-1、-2、-3再额外加一个0的状态),接着定义了四个volatile变量和一个Node类型的普通变量nextWaiter: Node。我们需要着重分析下这四个volatile变量和nextWaiter变量。
- volatile int waitStatus:
- CANCELLED值为1表示当前节点装载的线程因超时或者中断而取消,Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
- SIGNAL值为-1表示后继节点被park了,如果当前节点释放锁了或者装载的线程取消了,那么需要unpark自己的后继节点,To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block.
- CONDITION值为-2表示在条件队列(condition queue)中,It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
- PROPAGATE值为-3表示后续的acquireShared操作能够得以执行,A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
- 值为0表示当前节点在同步队列(async queue)中等待着获取锁
- volatile Node prev:前驱节点
- volatile Node next:后继节点
- volatile Thread thread:当前节点装载的线程
- Node nextWaiter:这个后继节点和后继节点next是有区别的,nextWaiter后继节点是指在条件队列(condition queue)中的后继节点或者是共享节点,因为条件锁必须是独占模式,在共享模式中我们也可以使用这个成员变量表示后继节点节点,这样我们就节省了一个成员变量。注意只有nextWaiter而并没有定义preWaiter。Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.
从上面我们可以看出,同步队列async queue是一个双向队列,而条件队列condition queue是一个单向队列只有nextWaiter而没有preWaiter。
关于waitStatus
这个变量,源码中的注释写的非常清楚,我不翻译了,大家细心看下:
The values are arranged numerically to simplify use. Non-negative values mean that a node doesn't need to signal. So, most code doesn't need to check for particular values, just for sign. The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. It is modified using CAS (or when possible, unconditional volatile writes).
内部类 ConditionObject
这个类是用来维护条件队列(condition queue)的,这个类和LinkedList很像,就是一个单向的列表,里面维护了一个头结点firstWaiter和尾节点lastWaiter以及两个十分重要的方法await()
和signal()
。先来看下这个类的一个很核心的await
方法:
public final void await() throws InterruptedException {
//加入条件等待队列前需要进行中断判断
if (Thread.interrupted())
throw new InterruptedException();
//加入条件等待队列
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
//中断标记位
int interruptMode = 0;
/*死循环阻塞,直到被通知或者被中断:
1) 当被通知唤醒时还得判断一下当前节点是否已经转移到AQS同步队列当中(其实主动通知的线程会确保其后继等待节点转移到同步队列中,所以被通知后在下一次循环条件为false,继续后续流程);
2) 当被中断唤醒时需要确保节点被转移到同步队列中,然后根据中断发生在被通知前后位置设置中断模式,并跳出循环
关于中断模式:
1) 当在被通知前被中断则将中断模式设置为THROW_IE;
2) 当在被通知后则将中断模式设置为REINTERRUPT(因为acquireQueued不会响应中断)。
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/*死循环获取同步状态,并在同步状态获取成功或者取消获取时设置中断模式:
如果在被通知之后获取锁过程中发生中断则将中断模式设置为REINTERRUPT。
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//重新链接CONDITION节点。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根据中断模式抛出异常。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
注意:被中断的线程跳出while循环后,会调用acquireQueued方法自旋获取锁,尝试获取同步状态,而不是立即响应中断抛出中断异常。在最后根据中断模式来决定是否抛出异常。
再来看下signal()
方法:
//通知头结点到同步队列中去竞争锁,用于独占模式
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/*
从头结点向后遍历直到遇到一个非canceled或者null的节点
并将其移除条件等待队列,并添加到同步队列的尾部
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
}
//当头结点转移失败后,继续重试并确保更新后的头结点不为null
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//如果CAS失败,说明节点在signal之前被cancel了,返回false
//CAS尝试将节点的waitStatus从CONDITION-2修改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//进入同步队列,返回的p是这个节点在同步队列中的前驱节点
Node p = enq(node);
//获取前驱节点的状态
int ws = p.waitStatus;
//1 如果前驱节点取消则直接唤醒当前节点线程
//2 或者前驱节点没取消的话,将前驱节点状态设置为SIGNAL(-1),保证能被前驱节点通知到,如果设置失败,直接唤醒当前节点的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
可以看到,更新waitStatus的操作是依赖Unsafe的CAS机制进行的。
总结:
- 每一个创建的ConditionObject都维持这各自的一个单向的等待队列,但是每个ConditionObject都共享一个AQS的FIFO同步队列,当调用await方法时释放锁并进入阻塞状态,调用signal方法将条件等待队列中的首节点线程移动到AQS同步队列中并将其前继节点设置为SIGNAL或者直接唤醒线程使得被通知的线程能去获取锁。
- 调用
await
方法释放锁并将线程添加到条件等待队列中并没有采用死循环CAS设置(参考AQS.enq方法),因为Condition对象只能用于独占模式,而且在调用await之前会显示的获取独占锁,否则会抛出非法监视器状态异常。 - 调用
signal
方法将转移等待节点,也不需要CAS来保证,因为signal会确保调用者caller是获取独占锁的线程(通过isHeldExclusively方法来判断,如果为false会抛出非法监视器状态的异常)。
AQS的同步状态
AQS的int型成员变量state
表示同步状态,它是用volatile修饰:
private volatile int state;
在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。同时AQS提供了getState()、setState()、compareAndSetState()方法来获取和修改该值:
可重入锁指的是在一个线程中可以多次获取同一把锁,比如:一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁。synchronized锁可以看做重入锁。但要注意,获取多少次锁就要释放多么次,这样才能保证state是能回到零态的。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加)。
所以可重入数大于1表示该线程可能调用了多个需要当前锁的方法,或同一个线程调用了多次lock()方法。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
同步队列async queue(CLH)
CLH是一个FIFO线程等待队列,多线程争用资源被阻塞时会进入此队列,它的头节点时虚拟节点。 CLH同步队列(图片来自引用4)独占锁acquire/release方法解析
对于获取锁的流程,这张图比直接贴代码更加直观,希望结合源码仔细看下,图片来自文献3: acquire获取锁流程- "当前线程"首先通过
tryAcquire()
(大部分由子类实现)尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。 - "当前线程"尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程"加入到"CLH队列"尾部。CLH队列就是线程等待队列。
- 执行完addWaiter(Node.EXCLUSIVE)之后,调用acquireQueued方法获取锁,判断当前线程的Node == head,如果不是,调用park方法,让当前线程进入休眠状态,等待唤醒;
- "当前线程"在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果"当前线程"在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。
- 当当前的Node是头的时候,再次回调tryAcquire,子类业务逻辑修改整个AQS队列的state的状态, 设置为头节点,next指针==null,方便GC,返回执行对应的业务逻辑;
对于锁的释放,源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
它调用tryRelease()来释放资源,这个tryRelease()方法一般都由子类实现。
在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。看下unparkSuccessor()的源码,它在AQS中实现:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个离头节点最近的一个等待位置。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次循环判断p==head就成立了),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了。
共享锁acquireShared/releaseShared方法解析
acquireShared(int)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared()方法依然需要自定义同步器去实现,但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
看下doAcquireShared源码:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入队列尾部
boolean failed = true;//是否成功标志
try {
boolean interrupted = false;//等待过程中是否被中断过的标志
for (;;) {
final Node p = node.predecessor();//前驱
if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//尝试获取资源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
p.next = null; // help GC
if (interrupted)//如果等待过程中被打断过,此时将中断补上。
selfInterrupt();
failed = false;
return;
}
}
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
重点分析一下获取锁后的操作:setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
/**
* 如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,
* 或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程,怎么理解这个传播呢,
* 就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,
* 只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。具体请看doReleaseShared方法。
*
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作,这体现了共享锁的定义。
releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,一是共享的实质--多线程可并发执行;二是共享模式基本也不会重入吧(至少我还没见过),所以自定义同步器可以根据需要决定返回值。
doReleaseShared()源码:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 从队列的头部开始遍历每一个节点
int ws = h.waitStatus;
// 如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。
// 为什么会设置不成功,可能改节点被取消;还有一种情况就是有多个线程在运行该代码段,这就是PROPAGATE的含义吧。
// 如果一个节点的状态设置为 Node.SIGNAL,则说明它有后继节点,并且处于阻塞状态
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果状态为0,则设置为Node.PROPAGATE,设置为传播,该值然后会在什么时候变化呢?
// 在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,
// 为了保险起见,会将前置节点状态设置为Node.SIGNAL,然后再次判断,是否需要阻塞。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
/**
* 如果处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法,那head在什么时候会改变呢?
* 当然在是抢占锁成功的时候,head节点代表获取锁的节点。一旦获取锁成功,则又会进入setHeadAndPropagate方法,
* 当然又会触发doReleaseShared方法,传播特性应该就是表现在这里吧。再想一下,同一时间,可以有多个多线程占有锁,
* 那在锁释放时,写锁的释放比较简单,就是从头部节点下的第一个非取消节点,唤醒线程即可,
* 为了在释放读锁的上下文环境中获取代表读锁的线程,将信息存入在 readHolds ThreadLocal变量中。
*/
if (h == head) // loop if head changed
break;
}
}
参考文献
网友评论