ReentrantLock和AQS在concurrent库中的package路径ReentrantLock是java.util.concurrent.locks包里的一个lock实现,顾名思义该锁是可重入锁,可重入的最大次数为Integer.Max,超出则抛出Error。
ReentrantLock内聚一个基于AbstractQueuedSynchronizer的同步器,Lock接口里的方法都是基于这个同步器实现的。同步器有两个内部实现,一个是公平同步器,一个是不公平同步器。公平性即等待锁时间越长的线程获取锁的优先级越高,但ReentrantLock默认是采用不公平同步器,我的理解是大多应用场景无需考虑公平性,也就是线程因竞争锁多阻塞一段时间是可接受的。只要对锁的使用得当,每个线程最终都能成功获取锁。公平性通过构造方法也是可以自行指定的。
AbstractQueuedSynchronizer(简称AQS)同样如同其名,是一个抽象的基于CLH队列的同步器,它定义了一套多线程访问共享资源的同步器框架。许多其它的同步类实现也都依赖于它,如常用的Semaphore、CountDownLatch等。同时,它提供两种模式的锁抽象:共享模式和独享模式;共享模式类似读写锁。Semaphore和CountDownLatch是共享模式的实现,而ReentrantLock是独享模式的实现,因此本文只阅读有关独享模式的代码。
ReentrantLock除了实现了Lock接口外,还定义了一系列的public和protected方法,用于检测和监控lock的state,如public getHoldCount()获取重入次数,protected getQueuedThreads()获取因竞争锁而阻塞的线程名称列表。
本文主要分析公平性和Lcok接口的方法
为了达到更好的阅读效果,可先思考如下问题,然后带着问题阅读该文和源码~
编号 | 问题描述 |
---|---|
1 | ReentrantLock如何实现重入的?如何维护重入的数值? |
2 | ReentrantLock的公平性和非公平性体现在哪些指标上,如何实现的? |
3 | 多个线程同时调用lock()方法,一个线程会成功,其他线程会阻塞,阻塞是如何实现的?即lock()的工作原理? |
4 | 调用unlock()方法将会唤醒一个因lock()而阻塞的线程,唤醒如何实现的? 即unlock()的工作原理? |
5 | Lock接口类中lock()、lockInterruptibly()、tryLock()这三个方法的异同? |
6 | Condition类作用、工作原理以及如何使用? |
7 | AQS的大致工作原理?CLH队列是如何工作的? |
1. ReentrantLock类结构
ReentrantLock类的内部结构相对还是比较简单的:
- 两个内部类实现了两个基于AQS的同步器:一个是公平的, 一个是不公平的。
- 实现了Lock接口类,如lock、unlock,tryLock、lockInterruptibly等方法。
- 提供了获取ReentrantLock运行状态的各种方法,如getHoldCount()、getQueuedThreads()等。
不知道如何调整代码里注释的颜色,默认的颜色看着很费眼,哪位知道?欢迎评论区留言指点~
/**
* 内部类
**/
class Sync extends AbstractQueuedSynchronizer {...}
class NonfairSync extends Sync {...}
class FairSync extends Sync {...}
/**
* 成员变量
**/
Sync sync;
/**
* Lock接口里方法
**/
lock() {..}
lockInterruptibly() {..}
tryLock() {..}
tryLock(long timeout, TimeUnit unit) throws InterruptedException {..}
unlock() {..}
newCondition() {..}
/**
* 其他监控方法,这里只列出其中两个,其它请查阅源码
**/
getHoldCount() {..}
getQueuedThreads() {..}
2. 公平性与非公平性
ReentrantLock提供了两个构造方法,默认构造方法使用非公平同步器,因为大部分业务场景发生锁竞争时,未获取锁的线程多阻塞一会儿是可以接受的,既不要求实时也不要求有序,而且非公平同步器性能会好一些;使用者可通过构造方法指定公平性。
public ReentrantLock() {
sync = new NonfairSync();
}
//通过构造方法可以指定公平性
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平同步器性能为啥好?阅读lock()源码时,会回答这个问题。我们先继续阅读公平同步器和非公平同步器有哪些不同?通过源码对比,在获取锁的执行流程中都会执行tryAcquire,公平性的tryAcquire只比非公平性的tryAcquire多了一个判断条件!hasQueuedPredecessors(),仅此一行的区别。这个区别就是,非公平性的实现在竞争锁时,无需判断阻塞队列里是否已经有其他线程也在竞争锁,直接插队拿锁走人~
/**
* 公平版本的tryAcquire. 只有当没有阻塞线程或队列为空时,可以继续申请锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 非公平版本的tryAcquire。
* 这个实现不是在NonfairSync类里,而是在Sync里
* 因为无论当前使用哪种同步器,Sync里的tryLock()都是调用的nonfairTryAcquire(1)
* 因此把nonfairTryAcquire的实现放到基类Sync里
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors方法里的tail和head是AQS里的成员变量,下个章节会介绍AQS如何维护tail、head和CLH队列的。下面列出一个关于ReentrantLock公平性的测试程序,通过测试结果可以看出公平锁是按照锁的申请次序依次分配的,而非公平锁则是随时可能被强占的。
非公平性锁的测试结果如下图,当阻塞队列里有线程时,新申请锁的线程会直接插队抢走锁。
非公平性的锁的测试结果
公平性锁的测试结果如下图,当阻塞队列里有线程时,新申请锁的线程会追加到队列的尾部。
公平性锁的测试结果
测试的代码如下
public class FairOrUnfairLockTest {
private Lock fairLock = new MyReentrantLock(true);
private Lock unfairLock = new MyReentrantLock(false);
/**
* 公平锁测试
*/
public void testFairLock() {
for (int i = 0; i < 3; i++) {
Thread thread = new MyThread(new Job(fairLock), i + "");
thread.setName("" + i);
thread.start();
}
}
/**
* 非公平锁测试
*/
public void testUnfairLock() {
for (int i = 0; i < 3; i++) {
Thread thread = new MyThread(new Job(unfairLock), i + "");
thread.start();
}
}
public static void main(String[] args) {
FairOrUnfairLockTest fairOrUnfairLockTest = new FairOrUnfairLockTest();
fairOrUnfairLockTest.testFairLock();
// fairTest.testUnfairLock();
}
public static class MyReentrantLock extends ReentrantLock {
public MyReentrantLock(boolean fair) {
super(fair);
}
@Override
public String toString() {
List<Thread> waitThreads = Lists.newArrayList(getQueuedThreads());
Collections.reverse(waitThreads);
Thread o = this.getOwner();
return ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "] ") +
"[Waited by threads " + waitThreads + "]";
}
}
public static class MyThread extends Thread {
public MyThread(Runnable target, String name) {
super(target, name);
}
public String toString() {
return getName();
}
}
private static class Job implements Runnable {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
public void run() {
/**
* 释放锁后立即再次申请锁
*/
for (int i = 0; i < 2; i++) {
lock.lock();
System.out.println(lock.toString());
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2 * 1000) {
}
} finally {
lock.unlock();
}
}
}
}
}
3. lock执行流程
本文将以非公平锁的lock流程为例来分析lock执行流程。先看ReentrantLock类里的lock方法,只有一行代码,执行NonfairSync里的lock方法;
public void lock() {
sync.lock();
}
接着看NonfairSync里的lock方法,如果锁可用直接拿走锁返回,否则调用AQS里的acquire方法。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
接着看AQS里的acquire方法。有两个判断条件,执行逻辑是:先尝试获取锁tryAcquire,成功则返回;如果失败,从获取插入阻塞队列acquireQueued;最后会调用自我中断;
AQS :: public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
接着看NonfairSync里的tryAcquire方法,上个章节提到非公平性锁的性能会更好些。这里给出分析,tryAcquire在锁可用时,无需检查阻塞队列是否为空,会插队抢走锁。如果加上这个检查(公平同步器的实现),当阻塞队列不为空时,总会执行AQS里的acquireQueued方法,acquireQueued会创建阻塞节点,然后进入阻塞 > 唤醒流程,而这个流程要从用户态切换到内核态,相对比较耗时。正是因为非公平同步器很多锁申请可以跳过这个流程,因此性能会更好些。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面已经提到acquireQueued会创建阻塞节点,然后进入阻塞 > 唤醒流程,代码如下:
AQS :: public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 创建阻塞节点并插入到阻塞队列,然后进入阻塞 > 唤醒流程
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS :: final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 进入阻塞 > 唤醒流程
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
执行acquireQueued时会执行addWaiter(Node.EXCLUSIVE), arg),它的作用是把当前线程加入到阻塞队列里;因此在分析acquireQueued方法前,先阅读addWaiter方法和Node数据结构。addWaiter方法相对简单,不做详细解读~
AQS :: private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node数据结构,其中waiterStatus最为关键,阻塞、唤醒、中断、取消时会更改这个值。
AQS :: static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//取消状态
static final int CANCELLED = 1;
//后继节点需要被唤醒 LockSupport.upark()
static final int SIGNAL = -1;
//处在某个condition的wait状态 condition.await()
static final int CONDITION = -2;
// 共享模式会用到,这里略
static final int PROPAGATE = -3;
// 节点状态,上边已经给出简要的解释,建议阅读一下源码里详细注解;
volatile int waitStatus;
// 前驱
volatile Node prev;
// 后继
volatile Node next;
// 阻塞线程
volatile Thread thread;
// Link to next node waiting on condition, or the special value SHARED
Node nextWaiter;
类CLH队列的AQS阻塞队列
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}
class PlatformParker{
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
}
lock()的执行时序图如下图。其中HotSpot JDK里的os_linux.hpp文件有个PlatformParker类,PlatformParker类里的的park()和uppark()方法提供操作系统级别的线程阻塞和唤醒;
lock()执行流程
lock执行流程2(转自https://blog.csdn.net/luonanqin/article/details/41871909)
4. tryLock执行流程
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5. unlock执行流程
public void unlock() {
sync.release(1);
}
AQS :: public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
/**
waitStatus!=0表示处于CANCEL(1) 或SIGNAL(-1) 或CONDITION(-2) 或CONDITION(-3)
也就是说waitStatus不为零表示它的后继在等待唤醒。
**/
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS :: private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unlock()执行流程(转自https://blog.csdn.net/luonanqin/article/details/41871909)
6. lockInterruptibly()执行流程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
7. newCondition()
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
}
8. 总结
之前提到的7个问题是否有了答案?希望读者读了本文后能对ReentrantLock的使用有了更准确的把握~
编号 | 问题描述 |
---|---|
1 | ReentrantLock如何实现重入的?如何维护重入的数值? |
2 | ReentrantLock的公平性和非公平性体现在哪些指标上,如何实现的? |
3 | 多个线程同时调用lock()方法,一个线程会成功,其他线程会阻塞,阻塞是如何实现的?即lock()的工作原理? |
4 | 调用unlock()方法将会唤醒一个因lock()而阻塞的线程,唤醒如何实现的? 即unlock()的工作原理? |
5 | Lock接口类中lock()、lockInterruptibly()、tryLock()这三个方法的异同? |
6 | Condition类作用、工作原理以及如何使用? |
7 | AQS的大致工作原理?CLH队列是如何工作的? |
参考:
lock和unlock https://blog.csdn.net/luonanqin/article/details/41871909
LockSupport park和unpark https://blog.csdn.net/hengyunabc/article/details/28126139
查看OpenJdk native方法实现 https://blog.csdn.net/kelindame/article/details/44625255
Unsafe类 https://www.cnblogs.com/pkufork/p/java_unsafe.html
本文为本简作者原创,无版权问题喜者可转~ 功力有限,有任何错误欢迎指正~
网友评论