什么是ReentrantReadWriteLock
ReentrantReadWriteLock
和ReentrantLock
同样是重入锁,支持公平方式和非公平方式获取锁。
ReentrantReadWriteLock
同时支持共享锁和独占锁。对于共享资源的读写操作而言,我们允许多个线程同时读取,但是同一时刻仅允许一个线程写入,并且写入操作进行的时候其他线程无法读取。ReentrantReadWriteLock
正是这样的一种锁,内部维护了两个锁:读锁和写锁。如下所示:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
其中读锁是共享锁,写锁是独占锁。
实现方式
ReentrantReadWriteLock
使用同一个AQS队列来处理共享锁和独占锁的请求。
通过上一篇JUC原理之ReentrantLock我们知道AQS的state表示某一线程的重入深度。但是在ReentrantReadWriteLock
中仍然这样记录是不能满足要求的,原因如下:
-
ReentrantReadWriteLock
同时有读锁和写锁两种,那么这两种锁当前持有的个数都要记录 - 读锁是共享锁,可以被多个线程同时持有,但又是可重入锁。这就意味着每个持有读锁的线程的重入深度也需要记录
下面将在代码的分析中逐个说明如何解决上述的两个问题。
为了解决同时存放读锁和写锁个数的问题,ReentrantReadWriteLock
的同步状态(state)这个整型变量分为两部分:
- 高16位用于存放共享锁的持有数量
- 低16位用于存放独占锁的持有数量
如下方代码所示(摘自Sync
类):
// 共享数量偏移量,使用state的高16位存放,自然偏移16
static final int SHARED_SHIFT = 16;
// 共享锁数量的1在state中对应着是几,当然是高16位的最低位+1了
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁或独占锁的最大持有数量,state变量分成了2部分使用,每种所的持有最大量变为2的16次方-1
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 返回一个低16位全为1的整型
// 和子网掩码用法类似,state和EXCLUSIVE_MASK按位与,可以得到当前独占锁持有的数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
// 获取当前持有共享锁总数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 获取当前持有的独占锁数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
获取共享锁的过程
注意一种特殊情况:允许同一个线程先获取独占锁,没有释放的时候又获取共享锁。
我们分析下tryAcquireShared
方法的实现:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 如果有线程持有独占锁,并且独占线程不是当前线程,获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取共享锁数量
int r = sharedCount(c);
// 判断是否可以获取读锁(共享锁),这个方法在公平锁和非公平锁中的实现不同,稍后分析
// r值必须小于MAX_COUNT,否则会溢出
// CAS尝试读锁个数增加1(高16位)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果目前没有线程得到共享锁
if (r == 0) {
// 设置firstReader(第一个读线程)为当前线程
firstReader = current;
// 设置firstReader的重入数量为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 这个分支含义为第一个读线程为当前线程,并且它再次尝试获得锁(重入)
firstReaderHoldCount++;
} else {
// 这里要更新线程持有的读锁数目(线程的重入深度)
// HoldCounter是一个线程持有锁数量的计数器,保存了线程id和锁计数两个变量
// cachedHoldCounter是最后一个获取共享锁的线程对应的HoldCounter,通常来说释放锁的线程就是上一个获取锁的线程。把这个线程的HoldCounter缓存可以减少ThreadLocal查找的时间
HoldCounter rh = cachedHoldCounter;
// 如果rh为空,或者rh的线程ID不是当前的线程ID,更新cachedHoldCounter为当前线程对应的HoldCounter
// readHolds变量在Sync构造函数中创建,线程第一次访问的时候返当前线程对应的HoldCounter,count值为0
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
// 如果rh为当前线程的cache,并且count为0(线程第一次获取共享锁)
// 记录当前线程的HoldCounter到ThreadLocal
else if (rh.count == 0)
readHolds.set(rh);
当前线程的共享锁数量加1
rh.count++;
}
// 此处获取读锁成功,返回1
return 1;
}
// 如果上面的if条件没有满足,需要走完整的获取共享锁流程
// 上面的步骤只是获取共享锁的快速方法
return fullTryAcquireShared(current);
}
注意:这里有三个变量:firstReader
, firstReaderHoldCount
和cachedHoldCounter
。他们的作用如下:
-
firstReader
,firstReaderHoldCount
只存储第一个获取到共享锁的线程和重入深度,cachedHoldCounter
是最近一次获取到共享锁的线程HoldCounter的缓存。 - 大多数情况而言没有锁竞争,只有一个线程获取锁和释放锁,此时只需要使用
firstReader
,firstReaderHoldCount
即可保存线程的重入次数,无需使用readHolds这个ThreadLocal
类型,提高了运行效率。 - 另一种常见的场景是具有锁竞争的环境,一个线程获取了锁,处理完业务后会紧接着释放锁。此时使用
cachedHoldCounter
可以避免查找ThreadLocalMap
,同样提高了代码运行效率。
HoldCounter
类用于保存每个线程持有的共享锁数量,需要在ThreadLocal中使用。代码如下:
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
readHolds
保存了每一个线程的共享锁数量,它是ThreadLocalHolderCounter
类型。它继承了ThreadLocal
类,代码如下:
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
在Sync
初始化的时候readHolds
变量也一同被初始化。线程第一次调用readHolds.get()
时候,自动调用initialValue
方法,创建并返回一个当前线程对应的HoldCounter
。
接下来分析完整版本的获取共享锁方法fullTryAcquireShared
,多用于处理CompareAndSetState
失败时候的逻辑(tryAcquireShared
的CompareAndSetState
失败后会执行此方法)。
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
// 自旋
for (;;) {
// 获取当前状态
int c = getState();
// 如果独占锁的持有数量不是0
if (exclusiveCount(c) != 0) {
// 检查持有独占锁的线程是不是当前线程,如果不是返回-1,获取锁失败
// 此处如果持有独占锁的线程是当前线程,方法不会返回
// 意思是允许同一个线程获取独占锁之后又获取共享锁
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 如果不允许获取共享锁
// 字面上是这个意思,但逻辑上的含义为如果readerShouldBlock()返回true,只允许重入,不允许新线程获得锁
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 如果firstReader不是当前线程
if (rh == null) {
rh = cachedHoldCounter;
// 如果cachedHoldCounter为空或者是cachedHoldCounter的HoldCounter和当前线程不对应
if (rh == null || rh.tid != getThreadId(current)) {
// 获取当前线程的HoldCounter
rh = readHolds.get();
// 如果count为0,从readHolds清除
// 该线程已不持有共享锁,不需要再记录该线程的共享锁数量
if (rh.count == 0)
readHolds.remove();
}
}
// 如果rh的count为0,说明当前线程没有获得共享锁,获取失败返回-1
// 如果rh的count不为0,说明当前线程持有共享锁,继续向下进行,允许再次获取共享锁
if (rh.count == 0)
return -1;
}
}
// 如果共享锁数量为最大值,抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS增加共享锁持有数量
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果成功,后面的逻辑和tryAcquireShared方法一致,不再赘述
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
获取独占锁的过程
获取独占锁的入口为tryAcquire
方法,代码如下:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 获取独占锁数量
int w = exclusiveCount(c);
// 如果有独占锁或者共享锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果独占锁数量为0,说明有共享锁,获取独占锁失败,返回false
// 如果独占锁数量不为0,持有锁的线程不是当前线程,说明其他线程持有独占锁。本线程获取独占锁失败,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果获取独占锁之后,独占锁计数大于MAX_COUNT,说明独占锁计数溢出
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 更新独占锁数量
setState(c + acquires);
// 获取独占锁成功
return true;
}
// 如果没有任何锁
// 允许当前线程获取独占锁
// 如果CAS设置state失败,代表获取锁失败,返回false
// 如果不允许当前线程获取独占锁,直接返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 如果成功,设置独占线程为当前线程,返回true
setExclusiveOwnerThread(current);
return true;
}
释放共享锁的过程
释放共享锁的方法为tryReleaseShared
,分析如下:
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 如果当前线程是firstReader
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果firstReader持有锁数量为1,设置firstReader为null
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 否则firstReader持有锁数量减1
firstReaderHoldCount--;
} else {
// 获得当前线程对应的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 如果count <=0,需要从readHolds变量中移除此HoldCounter
// 意味着该线程不再持有共享锁,无需再记录重入次数
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 重入数量减1
--rh.count;
}
// 死循环
for (;;) {
// 反复尝试CAS将state共享锁数量减1,成功后返回
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 返回结果为共享锁是否全部释放
return nextc == 0;
}
}
释放独占锁的方法
释放独占锁的方法为tryRelease
,分析如下:
protected final boolean tryRelease(int releases) {
// 如果持有独占锁的线程不是当前线程,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算新的同步状态
int nextc = getState() - releases;
// 判断独占锁是否全部释放
boolean free = exclusiveCount(nextc) == 0;
// 如果释放完后不再有独占锁,设置独占线程为null
if (free)
setExclusiveOwnerThread(null);
// 设置新的同步状态
setState(nextc);
// 返回独占锁是否全部释放
return free;
}
FairSync
和ReentrantLock
一样,ReentrantReadWriteLock
也有一个Sync
对象。它也有两个子类FairSync
和NonFairSync
,分别代表了共享锁和独占锁的实现。
对于FairSync
而言,设想一种场景:队列中有线程等待独占锁排队,后面跟随多个等待共享锁的线程排队。这时如果有队列外线程需要获取独占锁,必须在这些等待共享锁的线程后排队。
FairSync
的代码如下:
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
// 和ReentrantLock的公平独占锁一样
return hasQueuedPredecessors();
}
// 是否允许没有共享锁的线程获取共享锁(无论返回true或false,已经有共享锁的线程都允许再次获得共享锁,即允许重入)
// 没有独占锁的时候,共享锁是可以直接获取的,所以此时共享锁不会入队列
// 如果有需要获取独占锁的线程排队,或者是获取到独占锁的线程正在执行,后续需要共享锁的线程必须依次排队
// 有一种情况,如果一个线程持有独占锁尚未释放,后续没有其他线程排队,此时hasQueuedPredecessors也会返回false
// 感觉像是允许获取共享锁。但是tryAcquireShared方法先保证了没有独占锁的情况下才会获取共享锁,因此这个问题不存在
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
NonFairSync
如果队列中有等待独占锁的线程排队,后面跟随多个等待共享锁的线程排队。队列中第一个线程会和队列外线程抢占独占锁。如果队列外线程抢占失败,进入队列排队(位于等待共享锁的线程之后)。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
// 永远允许获取独占锁
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
// 这个方法用来防止等待获取独占锁的线程饥饿
// 后面详细介绍
return apparentlyFirstQueuedIsExclusive();
}
}
问题:apparentlyFirstQueuedIsExclusive
和hasQueuedPredecessors
在行为上会带来什么区别?
比方说AQS队列中的node为:
独占,共享,共享,独占。
第一个独占锁释放后,紧随的两个共享锁未来的及唤醒的时候,如果又有队列外线程要求获得共享锁,会抢在队列末尾的独占锁之前(apparentlyFirstQueuedIsExclusive
返回false,因为这时候队列head之后的node为共享)。这点和公平锁的行为是不同的(hasQueuedPredecessors
方法在这种情况下返回的是true)。
apparentlyFirstQueuedIsExclusive
方法的代码如下:
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 如果head节点的下一个节点是独占类型节点,并且不是空节点(s.thread != null)返回true
// 对于共享锁,可以直接获取,无需进入等待队列
// 所以说,如果有线程要求独占锁进入队列排队,它一定是紧随head之后的节点
// 如果返回true,说明有线程需要获取独占锁
// 为了需要独占锁的线程避免线程饥饿,在此之后需要获取共享锁的线程,不允许直接获取到锁,必须排队等待之前排队的独占锁获取并释放之后才能恢复运行
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。
网友评论