ReentrantLock 作为常用的多线程下锁的一种实现类,常和synchronized进行比较,本篇主要介绍ReentrantLock常用方法的源码实现。
架构总览
先贴一下类的整体结构:
源码介绍
里面有几个关键类:
AbstractQueuedSynchroinzer
Sync
FairSync:公平锁实现
NonfairSync:非公平锁实现
AQS 做为JUC包内的核心类,定义了一系列的实现乐观锁的规范及实现。如
tryAcquire:定义尝试获取锁的方法定义,由不同的实现类进行实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
acquire:定义了独占模式下线程如何入队。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock内部通过Sync实现了AQS的方法定义,并提供了常用获取锁的方法,下面主要介绍下常用方法的实现过程。
tryLock()
/**
* Acquires the lock only if it is not held by another thread at the time
* of invocation.
*
* <p>Acquires the lock if it is not held by another thread and
* returns immediately with the value {@code true}, setting the
* lock hold count to one. Even when this lock has been set to use a
* fair ordering policy, a call to {@code tryLock()} <em>will</em>
* immediately acquire the lock if it is available, whether or not
* other threads are currently waiting for the lock.
* This "barging" behavior can be useful in certain
* circumstances, even though it breaks fairness. If you want to honor
* the fairness setting for this lock, then use
* {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* <p>If the current thread already holds this lock then the hold
* count is incremented by one and the method returns {@code true}.
*
* <p>If the lock is held by another thread then this method will return
* immediately with the value {@code false}.
*
* @return {@code true} if the lock was free and was acquired by the
* current thread, or the lock was already held by the current
* thread; and {@code false} otherwise
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
里面只有一行代码,如果在new ReentrantLock时没有指定参数,或指定为false,默认使用非公平锁实现。这里的参数1指每lock一次,计数器加1。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
先看下非公平锁中的nonfairTryAcquire实现:
state是ReentrantLock中的volatile全局参数,用于标记当前锁是否已被抢占,默认是0,每抢占一次增加acquires次(一般都是1次)。
compareAndSetXXX是以乐观锁cas的方式,尝试修改一个变量的值。本质是调用操作系统cmpxchg指令,通过与期望值进行比较,相同则修改,不同则不修改。一般配置while(true)使用。
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//state是一个全局的volatile参数
int c = getState();
//如果c是0,则表示没有线程占有锁,可以尝试抢占
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//如果抢占成功,则设置拥有独占锁的线程是current线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果不是0,则代表锁已经被抢占,此时判断抢占锁的线程是不是当前线程本身
else if (current == getExclusiveOwnerThread()) {
//如果是重入,则增加重入的次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//因为已经是当前线程占有锁,不需要再通过cas修改
setState(nextc);
return true;
}
// 如果发现其他线程已经占有锁,则返回false
return false;
}
lock()
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
lock方法里只有一个if判断,首先尝试获取锁,成功则已,不成功就进入acquire。
acquire()
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
只有几行的方法,全部都是方法的封装。格式化一下写法
public final void acquire(int arg) {
if(!tryAcquire(arg)) {
Node node = addWaiter(Node.EXCLUSIVE);
boolean interrupted = acquireQueued(node, arg);
if(interrupted) {
selfInterrupt();
}
}
}
tryAcquire()的非公平锁的实现已经在上面介绍过了,如果是公平锁的实现
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //就多了这么一个区别
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors() 就多了这么一个判断,也就是如果等待队列中有线程在等待,就不再尝试抢占,直接返回false,后续也加到等待队列中。
如果没抢到锁,则将当前线程添加到等待队列中,也就是addWaiter方法。
addWaiter()
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 如果队列不为空,先快速尝试一次入队到尾节点,如果没成功就进入完成的enq调用。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter方法首先把当前线程封装为Node对象,且初始时head和tail节点都为null,如果是第一次调用,会先进入到enq方法中。
通过for循环始终确保当前node可以入队。
如果是第一次入队,会new一个空的node。head和tail都指向这个空节点。
如果是队列中已有,则把node作为添加到队列最后,tail指向node,node和前一个组成双向链表。
前一个 ---next---> node
前一个 <--prev--- node
tail -----> node
入队后,至此addWaiter方法完成。
acquireQueued()
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果线程被中断,需要返回中断标示,
//因为parkAndCheckInterrupt里的Thread.interrupted()会复位标示。所以在这里通过变量返回。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued 主要用于tryAcquire或者park。
首先判断当前node的的前一个节点是不是头节点,如果是,且获取锁成功,则把自己设置成head并返回。
否则进入shouldParkAfterFailedAcquire方法。
注意:这里的interrupted标志是否被中断过,因为内部的中断已经被Thread.interrupted()复位,通过此变量返回到上层方法,调用selfInterrupt重新设置一次中断。
目的是如果业务的代码中需要针对是否中断做逻辑处理,则通过selfInterrupt来再次触发中断来通知业务。
shouldParkAfterFailedAcquire
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire方法主要判断当前node能否被park。
被park的前提是node的pred节点的waitStatus必须是Signal状态。
parkAndCheckInterrupt
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果shouldParkAfterFailedAcquire返回true,则可以park。当被唤醒时,返回线程是否被中断过。
至此,主要的lock方法已经说明完成。
unlock()
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unlock 的处理比较简单,通过tryRelease来判断重入锁是否已经全部释放完成。全部释放则开始unpark后继节点。
unparkSuccessor
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
先将ws变量设置成0,防止重复唤醒。再判断next节点是否已经被取消,如果取消了。就从tail开始,倒序遍历,找到waitStatus<=0的node,unpark。
cancelAcquire()
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
最后说明下node是如何cancel的。
- 先判断pred节点是不是也被cancel了。如果是就一直向前找到没有cancel的节点。
- 如果当前节点已经是tail了,则将pred.next = null;
- 如果当前节点不是tail,且
3.1 pred节点不是head, 且
3.2 pred的ws是signal 或 pred的ws可以修改成signal
3.3 当前节点的next也没有被取消
以上条件都满足,就将pred.next = node.next 上。
否则,唤醒node的next节点,即unparkSuccessor。unparkSuccessor内部有处理,如果next是null,从tail开始倒序依次唤醒。
网友评论