平时在写java代码时,大家都习惯用synchronized
关键字或ReentrantLock
实现同步锁,相比synchronized
,ReentrantLock
功能更强大,提供了诸如可超时锁、可中断锁等特性,此处记录下最近所学ReentrantLock
笔记。
ReentrantLock
采用CAS
机制实现,该类提供的主要接口有lock
、unlock
等方法。要想了解ReentrantLock
的实现原理,我们首先得看明白AbstractQueuedSynchronizer
这个类,该类内部主要实现了一个双向列表用于保存抢锁的线程节点,以及一个实现了Condition
接口的ConditionObject
类,ReentrantLock
实现的newCondition
方法及返回了该类的一个实例对象,关于Condition
对象的作用稍后记录,我们先看下AbstractQueuedSynchronizer
的实现源码:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
Node nextWaiter
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Class Node
是AbstractQueuedSynchronizer
类的内部类,可以看出来该类是用于保存Thread以及Thread一些状态的双向队列,以及一个nextWaiter
的Node
对象,该对象是用于保存下一个idle的节点,会在ConditionObject
这个类中用到,此处稍后讲解;除Class Node
外另外还有三个重要的对象:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
前两个字段一看就明白实现了一个队列,分别为队列的头节点及尾节点,state
字段是最重要的一个字段,即为锁的状态,线程抢占锁即抢先修改该字段的值,state
非零代表锁已被其他线程抢占,为零代表目前没有线程占用该锁,修改该字段的方法即采用了CAS
机制实现,该方法修改state
值成功则当前线程抢锁成功,否则抢锁失败,源码如下:
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
另外AbstractQueuedSynchronizer
类实现了一些基本的入队列、出队列等方法,再次不做说明;接下来我们看下该类实现的获取锁的方法acquire
:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法为阻塞方法,基本逻辑为先调用tryAcquire
方法尝试获取锁,如果获取不成功,则将当前线程封装成Node
对象入队列(addWaiter
这个方法实现),然后一直等待当前线程节点出队列并成功抢到锁(acquireQueued
方法实现),同时在acquireQueued
方法中如果首次抢锁失败会尝试将当前线程挂起已节约资源,参看后面该方法的讲解;如果当前线程被唤醒过,代表当前线程被park过,则调用selfInterrupt
方法重新设置线程interrupt状态为interrupted(ps:本人感觉这块有点绕口,其实该方法就是唤醒线程并将interrupt设置为true,大家看下Thread
类interrupt
方法就明白了,再此不做说明)。tryAcquire
方法由具体的实现类实现,稍后介绍ReentrantLock
类时再看下ReentrantLock
类依据该方法分别实现公平锁及非公平锁,addWaiter
及acquireQueued
方法实现源码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其中shouldParkAfterFailedAcquire
方法判断如果抢锁失败是否需要将当前线程park掉,即停止当前线程的调度,这样可以节省资源,直到当前节点出队列时由其他释放掉锁的线程唤醒该线程继续抢占锁;如果抛出异常,则将node节点从队列中移除并unpark下一个节点线程;另外该类也实现了超时、可中断模式抢锁方法,超时模式即每次循环都判断下是否超时,超时则返回;可中断模式即如果线程中途被中断,则抛出中断异常。
获取锁的基本原理就是这些了,然后我们看下如何释放锁:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁很简单,首先调用tryRelease
方法将锁释放,即修改state字段值以及设置exclusiveOwnerThread
对象为null
,exclusiveOwnerThread
对象为当前占有锁的线程,该方法由具体类实现,稍后我们看下ReentranLock
类如何实现;锁释放后如果队列中还有抢锁的线程节点,则unpark
下一个线程节点参与抢锁。
如何抢锁及释放锁大致讲明白了,现在我们来看下ReentrantLock
具体如何实现公平锁及非公平锁的,首先盖类中定义了一个Sync
对象:
private final Sync sync;
Sync是ReentrantLock类的一个内部抽象类,大致实现源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
}
可以看到Sync
类继承了AbstractQueuedSynchronizer
这个类,同时实现了一个非公平获取锁方法及释放锁方法,所谓非公平获取锁就是直接参与抢锁,而公平获取锁则会按照线程节点入队列的顺序参与抢锁,同时提供了一个获取Condition对象的方法,这个稍后再讲,我们先通过nonfairTryAcquire
方法看下ReentrantLock是如何实现可重入锁的:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果当前没有线程占有该锁,则直接抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果有线程占有该锁并且该线程为当前线程,则将state值增加,即实现了可重入锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
然后我们看下另外两个分别实现公平锁及非公平锁类:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
该类实现很简单,直接抢锁compareAndSetState(0, 1)
,将state
值加一;如果失败则调用AbstractQueuedSynchronizer
类实现的acquire
方法,尝试将state
值加一,而acquire
方法又会调用nonfairTryAcquire
非公平抢锁方法抢锁;如果还是失败则将当前线程节点入队列(参看AbstractQueuedSynchronizer
类的acquireQueued
方法)并尝试park
当前线程已节约资源,直到当前线程被unpark
(在AbstractQueuedSynchronizer
类的release
方法中)。
公平锁的实现也很简单,就是按队列中先来先到的优先顺序抢锁,实现源码如下:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可以看到FairSync
类实现的尝试获取锁方法tryAcquire
会先判断下队列中是否存在线程节点(hasQueuedPredecessors
方法),如果不存在再去抢锁,否则将当前线程节点入队列(参看AbstractQueuedSynchronizer
类的acquireQueued
方法)并尝试park
当前线程已节约资源,直到当前线程被unpark
(在AbstractQueuedSynchronizer
类的release
方法中);同时也实现了可重入锁。
ReentrantLock
实现的lock
、unlock
等方法即调用了Sync
对象实现的lock
、release
方法:
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
至此,ReentrantLock
类所实现的加锁及释放锁机制就讲的差不多了,另外还剩下一个ConditionObject
对象,这个东西是干嘛的呢?ConditionObject
类是AbstractQueuedSynchronizer
类的内部类,其内部就是一单向链表,用于保存被unpark
掉的线程节点,我们在讲AbstractQueuedSynchronizer
类的内部类Node
类源码时说过,Node
类中定义了一个 Node nextWaiter;
,就是用于该类实现链表用的。但该类最大的作用在于能够park
及unpark
线程,基本实现原理也是调用LockSupport
类的park
及unpark
方法,让线程挂起及唤醒的大致实现源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
此处可能看不出来该ConditionObject
对象的具体作用,建议大家可以看下LinkedBlockingQueue
的实现源码就明白了,至此ReentrantLock
的实现原理基本讲解完毕,如有错误的地方敬请各位多多指正。
网友评论