概述
引入
我们之前说过concurrent
包相对synchronizer
关键字有很多优点,比如:更完备的监控方法和更细粒度的控制。我们在控制对临界变量的访问时,有时会有以下场景:
- 临界变量读取它的线程很多或者读取它的频率非常高
- 修改修改临界变量的操作频率很低
读线程之间是不会互相影响的,此时,如果我们能进行更细粒度的划分,那么在此类场景下,就可以获得更好的访问效率。
摘要
本文主要对ReentrantReadWriteLocal
中的重要方法进行了介绍,因为之前对AQS
、ReentrantLock
、TreadLocal
进行了详尽的介绍,所以我们这里主要对源码的关键方法进行分析,并在实现原理解析中从宏观上对ReentrantReadWriteLock
进行分析。
类介绍
类定位介绍
ReentrantReadWriteLock
实现了对同一资源的独占、共享的两种访问模式的协调。正式因为此锁对线程更细粒度的划分,可在某些情况下获得更好的操作效率。
注意
ReentrantReadWriteLock
本身就存在一定的资源开支,如果不是读操作明显频繁、读线程数量明显多的场景,尽量不要用此锁,否则可能适得其反。
此锁本身像ReentrantLock
一样提供了公平/不公平两种策略。注意一下,和之前一样,公平不是纯粹的公平,仍然存在抢占的场景
源码解读
类结构分析
照例,此类的核心实现仍然是继承了AQS
的内部类同步器。我们主要介绍这里定义的Sync
,NonfairSync
,FairSync
,然后对类中定义的一些方法做一下大概介绍即可。
同步器Sync
此同步器和ReentrantLock
中的同步器同名,但是实际是不太一样的,主要是因为此同步器同时支持共享、独占两种模式的线程,所以做了一些额外的处理
读写模式的相关计数及基本操作
/*
* 读写两种模式抽出来的一些常量和方法,锁状态被分成了两截,int是32位的:
* * 高16位表示共享模式共计获得锁的次数【所有读线程重入次数之和】
* * 低16位表示独占模式共计获得锁的次数【重入次数】
*
*/
// 获得共享模式部分需要移动的位数
static final int SHARED_SHIFT = 16;
// 共享模式下操作的最小单位【就是重入次数——1】
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 两个模式下各自允许存的最大重入次数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 获得互斥模式重入次数的掩码【就像网段掩码一样】
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 获得共享模式线程重入次数总计 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 获得互斥模式线程重入次数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 判断锁是否被独占线程占有
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 获得以独占模式获得锁的线程
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
// 获得读锁的获取次数
final int getReadLockCount() {
return sharedCount(getState());
}
// 获得是否写锁被占据
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
// 获得写锁重入次数
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
// 获得当前线程对读锁的重入次数
final int getReadHoldCount() {
// 如果读锁没有启用,直接返回 0
if (getReadLockCount() == 0)
return 0;
// 参见后面的内部优化介绍
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
内部优化相关操作
/**
* 定义了一个数据结构,用来记录Thread和Thread对读锁重入的次数【主要用作线程变量】
*
* 之所以定义数据结构是因为把 threadId 和重入次数搞到一起了,而为什么要存线程相关的变量,是因为在Sync
* 中做了一些count的缓存,是缓存在对象中不是线程变量中,所以为了比较,线程变量中必须记录下线程相关东西
*
*
* 用long类型的threadId而非Thread引用是为了防止因此处引用,导致线程执行完成后 Thread 的 domain 无法
* 被释放。导致内存出现泄漏【当然,jvm有没有做对应的优化我们后面阅读了jvm再说,目前来看
* 的话至少直接引用 Thread 是不好的】,而且,我们不用 Thread 的那么多东西,所以不用直接用引用
*/
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
/**
* 使用上面的数据结构定义的线程级变量,注意每次初始话都要new一下,原因看之前的ThreadLocal
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 线程级变量,在创建 Sync 实例时初始化
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 最近一次成功获得读锁的线程的数据,根据局部性原理,这个缓存可能会启到一定的优化效果
* 同理,使用 tid 而非线程引用,避免对垃圾回收造成一些困扰
*/
private transient HoldCounter cachedHoldCounter;
/**
* 第一个把锁从不是读锁转化成读锁的线程,一种缓存,用来做优化。【讲真,我觉得这俩的优化不一定有用,
* 反而给代码造成了不小的冗余】
*
* 不懂为什么用引用,官方的解释是在正经的释放操作中会解除这里的引用。【但是如果是线程问题自己退了没
* 释放,就不好了】
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
定义公平决策相关判断接口
/**
* 由子类根据同步队列中的情况和各自的公平策略进行实现
*
* 返回 true 表示此读线程在获取锁时应该阻塞
*/
abstract boolean readerShouldBlock();
/**
* 由子类根据同步队列中的情况和各自的公平策略进行实现
*
* 返回 true 表示此写线程在获取锁时应该阻塞
*/
abstract boolean writerShouldBlock();
根据业务实现AQS
未定义的方法
/*
* 根据场景补全 AQS 空出来的方法,主要是共享、互斥模式下的try方法
*
* 注意,因为入参的 int 包含两种信号量: 同步状态和独占状态,我们对信号量修改时统一做修改
*/
// 独占、释放
protected final boolean tryRelease(int releases) {
// 同步线程正在占有锁,此函数的调用者有问题
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 直接减,把同步量一起操作了
int nextc = getState() - releases;
// 根据信号量判断锁是否释放,如果释放就把指向缓存线程的指针置空
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 修改状态量并返回
setState(nextc);
return free;
}
// 独占、获得
protected final boolean tryAcquire(int acquires) {
/*
* 流程:
* 1. 如果读线程或着其他写线程占有锁,失败
* 2. 如果是重入,且重入次数没爆炸,则获得
* 3. 正常竞争,让 writerShouldBlock() 调节一下,其他没啥大变化
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 共享、释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果 firstReader 缓存的是本线程,修改 firstXXX 的计数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 否则将此线程的线程变量装进 cachedHoldCounter ,然后修改计数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 最后修改读锁的计数,因为读锁是多线程操作,可能会存在竞争引起的修改失败,故用循环保证
// 减少一次重入的操作能同步到读锁中
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁其实没啥变化,除非你释放的是最后一个占有读锁的线程,那样整个锁就都释放开了
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
// 共享、获得
protected final int tryAcquireShared(int unused) {
/*
* 流程:
* 1. 如果写锁被其他线程占有,则失败【这里强调其他线程是因为“锁降级”的存在】
* 2. 剩下的情况就是锁没有被占有或者被读线程占有。在调用 readerShouldBlock()谦让一下之后直接
* 搞起,如果失败,则原因如下:
* a). readerShouldBlock()告诉你要阻塞等一下
* b). 总重入次数饱和了
* c). 修改信号量失败了,有读线程先插手了
* 3. 如果2失败了,用循环去获取
*/
Thread current = Thread.currentThread();
int c = getState();
// 写锁被其他线程占有,直接失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 获得锁成功
if (r == 0) {// 获得锁之前锁没有被站,则用 firstXXX 存储此线程的重入次数
firstReader = current;
firstReaderHoldCount = 1;
}
// 如果不是竞争到锁的,直接找到线程对应的重入数据变量,加一个代表共享重入次数的"1"
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;//返回正数
}
return fullTryAcquireShared(current);//如果获取失败,循环重新获取
}
/**
* 上面情况2获取失败,循环重新获取
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { // 上面情况1发生了,直接失败
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 在此线程没获得读锁的情况下,readerShouldBlock() 说你该阻塞,那就别循环较劲了
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// readerShouldBlock() 说你该搞或者你只是想重入一下子
// 如果读锁同步量满了,直接抛错,别往上挤了
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 条件都还好,那就是最开始修改信号量时被人捷足先登了,再试着改一下
// 成功就成,失败就继续试
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
为具体的读/写锁封装的方法
/**
* 写锁用,不受 writerShouldBlock() 节制
*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
/**
* 读锁用,不受 readerShouldBlock() 节制
*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
抢占同步器NonfairSync
此处因为要抢占,所以只要不是事实不可为,那就拼一下:
/**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // 这里直接让你争,不用管队列
}
final boolean readerShouldBlock() {
/*
* 不用管队列中的读线程。但是为了防止写线程被延迟太多,特别照顾一下。
* 毕竟引用场景是读线程一大把,写线程很少,竞争到的概率本来就少,再不照顾一下,写线程的效率
* 就太低了
*/
return apparentlyFirstQueuedIsExclusive();
}
}
排队同步器FairSync
这里就要照顾到队列了
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
读锁
传入外面的ReentrantReadWriteLock
进行构建,所有的lock/unlock
全部依靠入参的sync
的同步锁操作完成。
不支持Condition
相关方法,因为不是独占
写锁
传入外面的ReentrantReadWriteLock
进行构建,所有的lock/unlock
全部依靠入参的sync
的独占锁操作完成。
基于Sync
支持Condition
相关方法
类内其他方法
都是一些监视器用的方法,不再概述
使用示例
没啥可详细说的,太简单。
唯一要单独提一句的是:有时我们在写操作完成后要立刻读一下,这时继续用写锁有点浪费,但是释放的话,再获得锁,不一定能立刻竞争到,所以我们提供了锁降级操作,即——占有写锁-->写操作完成-->占读锁-->释放写锁-->读操作完成-->释放读锁
实现原理解析
其实我们发现读写锁的实现原理很简单,就是通过同步队列获得锁的同时,我们维护一个信号量并通过此信号量处理好两类锁的竞争关系即可。但是还有一些疑问:
在读操作的线程中维护重入次数是否必须
我认为是不必须,我们只需要维护总的重入次数,在操作合规的情况下,锁的运行状态是正常的【即在finally
块中记得及时释放锁】。
我认为加这些东西是为了方便监控吧。
在读锁的操作中维护了多个缓存变量firstXX
,cached...
,是否合理
我认为不合理,如果你真的认为使用此类的场景是读操作的线程足够多读的足够频繁,那么你的这几个缓存起到的作用就越少。
而且在维护的firstReader
本身就是对线程的引用,和你前面专门用final long tid = getThreadId(Thread.currentThread())
记录线程的初衷是相违背的
注意事项
注意一下,找合适的场景即可。
总结及后续学习路线展望
至此,我们对concurrent
包的学习画上了一个完美的句号,后面我们会
- 继续把多线程的相关知识好好捋一遍
- 带着多线程的底子把java8的特性好好捋一遍
- 刷jvm
- 刷spring及其他javaEE架构
- 刷分布式和高并发调优相关知识
- java追最新版本
网友评论