一.ReentrantLock概述
ReentrantLock是基于AQS(AbstractQueuedSynchronizer)实现的,AQS是并发包的基础,CountDownLatch,FutureTask,Semaphore等都是基于AQS实现的。
二.AQS简介
AQS是基于FIFO队列(CLH队列)实现的,因此存在一个个节点,节点里面主要有:
//表示node处于共享模式
static final Node SHARED = new Node();
//表示node处于独占模式
static final Node EXCLUSIVE = null;
//因为超时或者中断,node被设置为取消状态,被取消的node不能去竞争锁,只能保持取消状态不变,处于这种状态的node会被提出队列,被gc回收。
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
//表示这个node在条件队列中
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
三.ReentrantLock的实现
ReentrantLock中有一个抽象嵌套类Sync,两个嵌套类NonfairSync和FairSync继承Sync实现lock()和tryAcquire()方法分别对应公平sync和非公平Sync。ReentrantLock中主要的嵌套类和API如下图。
image.png
ReentrantLock默认为非公平锁,可以通过在构造函数中传入true来定义一个公平锁实例:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
下面来看一下非公平锁的加锁过程,假如实例一个非公平锁,线程一调用lock()方法
public void lock() {
sync.lock();
}
lock()方法在公平Sync和非公平Sync中实现不同,下面是非公平的Sync中lock的实现:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState定义在AQS类中,使用cas操作更新state的值为1.
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//stateOffset是state的地址偏移量,会在类加载时候赋值。
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
由此可知线程一获取锁的时候做两件事:
1.使用cas操作将AbstractQueuedSynchronized的state更新为1。
2.将AOS的thread设置为当前线程。
此时线程2执行调用lock方法时cas操作将会失败,因为期望值已经被线程一更新为1。则线程二会执行acquire方法。
//acquire方法定义在AQS类中,tryAcquire为抽象方法,需要实现AQS的类重写该方法。
//以独占的方式尝试获取锁,忽略中断,tryAcquire方法需子类自己实现,如果尝试获取锁失败,则将节点入队
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//NonfairSync中
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
先来看看Sync中尝试获取锁的函数:
//最终会调用Sync中的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获得state
int c = getState();
//再次cas,如果没有线程占用锁,则线程二可以独占锁
//这地方有第一个疑问,有没有可能线程一占用锁,此时线程二执行到这段代码,获得c的值为1,这个时候线程一释放了锁,state更新为0,但线程二无法进入第一个if,第二个else if也无法进入,只能返回失败。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//否则判段持有锁的是否为当前线程,如果当前线程正在占有锁,则将state值+1.
//这地方有个第二个疑问,有没有可能当前线程在获得state的值并赋值给c后,其他线程更新了这个state的值,这样nextc得到的是不准确的值。后来想了想不可能,因为若是其他线程持有锁,则不会进入else if这个判段,若是当前线程持有锁,则调用lock()方法后正在执行这段代码的只有本线程,不可能有别处修改state的值。所以使用getState()获得的state值是准确的。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
nextc小于0时抛出一个error,说明线程最大可重入次数为Integer.MAX_VALUE
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再来看看addWaiter方法:
//以当前线程和给定的模式新建并入队节点,模式分为独占模式和共享模式。
private Node addWaiter(Node mode) {
//因为是独占模式,所以传入的mode为null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//这句话的意思为先执行一次cas操作尝试以快速的方式将节点入队,如果失败了的话,则执行enq入队方法,循环进行cas入队操作。
//获取队列的尾节点
Node pred = tail;
//如果尾节点为非空,说明队列中已经有节点,则将新入队结点前一个节点设置为队列中的尾节点,cas更新AQS的尾节点tail为node。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队列为空或者cas失败,则执行enq。
enq(node);
return node;
}
//使用cas将tail指向新入队的节点。
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
如下为enq方法:
private Node enq(final Node node) {
for (;;) {
//如果队列为空,则初始化头结点尾节点,将头节点设置为一个傀儡node
//这里头尾节点都是延迟初始化,在出现竞争有节点入队时才初始化。
Node t = tail;
//如果队列为空,则先将头尾节点指向一个没有关联线程的傀儡节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如上这段代码揭示了CLH队列究竟是什么样子,如下图。
未命名文件7.png
由此可见CLH队列为一个双向链表,头结点为一个傀儡节点,尾节点的引用指向队列中最后一个node。
这里还有一个疑问,就是为什么要使用cas将tail指向新入队节点呢?待明确
再来看看acquireQueued这个函数
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果队列的前驱结点为头结点,说明当前节点在队列头,则再次执行tryAcquire方法尝试获取锁。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//更新node的waitStastus
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//将当前线程的状态变更为等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
再来看看shouldParkAfterFailedAcquire这个方法,这个方法的主要目的用来更新节点的waitstatus。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果这个节点的waitStatus已经
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果waitStatus为CANCEL,说明其前驱节点对应的线程被中断,跳过这个节点
if (ws > 0) {
//循环,只要前驱结点的waitStatus为CANCEL,则继续向前找,将这些CANCEL的节点从链表上断开,后续会被gc回收。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这段代码的主要目的是将入队了的节点的前驱的waitStatus,如果其前驱节点的waitStatus为0,则cas更新为SIGNAL,如果为CANCAL,则将其从链表上断开。可以结合下面情形来理解:
现在假设线程一、线程二和线程三竞争锁,线程一先竞争锁,线程二的node先入队,进入acquireQueued这个方法后,先tryAquire()尝试能否获得锁,不能则进入shouldParkAfterFailedAcquire(),线程二的前驱节点为head,其waitStatus=0,cas更新其为SIGNAL-1。然后线程三入队,将线程二对应的node的waitStatus更新为-1,并调用lockSupport.park方法阻塞当前线程。最终结果如下图
未命名文件7.png
综上所述,可以知道整个加锁过程中发生了什么操作:当有竞争发生时,整个操作如下:首先cas更新state值为1,并将占用锁的线程设置为当前线程。如果cas失败,则再次获取state的值,如果state的值为0则说明竞争已解除再如上进行一次cas操作更新state为1。如果不为state不为0,则判段占用锁的是否为当前锁,如果是,则state的值+1。如果不是,则将当前线程入队,入队时每次节点插入到队列的末尾,然后使用cas更新尾节点指向新入队的节点。然后修改节点前驱节点的waitStatus为SIGNAL,并将当前结点阻塞。从而完成整个加锁过程
那么解锁过程又发生了什么呢?
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
如果队列中有node的话,那么head的waitStatus为-1.
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
其中release()方法定义在AQS类中,tryRelease方法需要继承AQS的类自己实现。
protected final boolean tryRelease(int releases) {
//这里和state的累加
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
因为调用一次lock()就要相对应的调用一次unlock(),所以没调用一次unlock(),state值-1,当state为0时,调用setExclusiveOwnerThread将占有锁的线程置为空。表明已释放占有的锁。然后接着判段队列的head是否为null,waitStatus是否<0,如果head不为null并且waitStatus<0时,说明队列中存在node,则需要调用unparkSuccessor方法唤醒队列第一个node对应的线程。FIFO就体现在这里。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//这里的node为头结点,将头结点的waitStatus设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//如果队列的头节点的线程已经中断了,那么需要从队列尾向前寻找
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
被唤醒的线程将继续执行,该被阻塞线程是调用parkAndCheckInterrupt()阻塞的,代码仍在for循环中,将继续执行。将进入第一个if判段,此时tryAcquire可以成功更新state的值,将会返回true,第一个if成立,将头节点指向下一个队列的下一个节点。被唤醒的线程对应的node将会从双向链表中断开并最终被gc回收。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
//此时被唤醒的线程对应的节点已经出队,将被gc回收
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看到这里不得不赞叹Doug Lea的鬼斧神工。
故回顾整个解锁过程,首先会cas更新state,没调用一次unlock就会将state的值-1。当state为0时,表示锁已释放,此时将队列中的第一个节点对应的线程唤醒,被唤醒的线程可以成功占有锁,并将该线程从出队,head指向队列中新的第一个节点。
如上为整个ReentrantLock非公平所的加锁解锁过程,公平锁的加锁过程会在之后更新。
网友评论