概述
读写锁之前提到锁(如Mutex和Reentrant Lock) 都是排他锁, 这些锁在同一时刻只允许一个线程进行访问, 而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。在没有读写锁支持的(Java 5之前) 时候, 如果需要完成上述工作就要使用Java的等待通知机制, 就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠synchronized关键进行同步) , 这样做的目的是使读操作能读取到正确的数据, 不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性与吞吐量。
Java并发包提供的读写锁是实现是ReentrantReadWriteLock,他提供的特性如下。
image-202009092325093421 ReentrantReadWriterLock 基础
ReentrantReadWriteLock实现ReadWriteLock接口
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
ReentrantReadWriterLock 通过两个内部类实现 Lock 接口,分别是 ReadLock,WriterLock 类。与 ReentrantLock一样,ReentrantReadWriterLock 同样使用自己的内部类Sync(继承AbstractQueuedSynchronizer)。具体类图的UML如下
image-20200920001413106ReentrantReadWriteLock初始化代码如下,默认是非公平锁。
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
首先ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数(AQS中的state),与ReentrantLock中的state类似(ReentrantLock中表示锁被同一个线程重复获取的次数)。但是读写锁需要记录读锁与写锁两种状态,读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。采取的办法是,高16位用来表示读锁一共被持有的次数(包括多个线程),用低16位表示写锁被同一个线程申请的次数。
读写锁的状态为state
写状态等于 state&0x0000FFFF(抹除高16位)
读状态等于 state>>>16(无符号位补0右移16位)
当写状态增加1时,等于state+1
当读状态增加1时,等于state+(1<<16)及state + 0x00010000
结论:如果state不等于0,当写状态(state&0x0000FFFF)等于0,而读状态(state>>>16)大于0,则表示该读写锁的读锁已被获取
读写状态的源码
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2 ReentrantReadWriteLock.WriteLock
写锁是一个支持重入的排他锁,如果当前线程已经持有了写锁,则增加写状态。如果当前线程在申请写锁时,读锁已经被其他线程持有(读状态不为0)或者该线程不是已经持有写锁的线程,则当前线程进入等待状态。
2.1 写锁的获取
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire去申请写锁,当申请写锁失败时,把当前线程构造为Node节点加入AQS同步队列中,同时使当前线程进行park。
2.1.1 tryAcquire
申请写锁失败的情况
- 读锁被其他线程持有
- 同一个线程先持有读锁,在申请写锁
- 写锁被其他线程持有
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 获取写状态值
int w = exclusiveCount(c);
// 判断是否有线程持有同步状态,即线程持有了读锁或者写锁
if (c != 0) {
// 如果写状态为0,则表示写锁未被持有,但是c!=0,表示读锁已经被其他线程持有。获取锁失败
// 如果写状态不为0, 当前线程不是持有锁的线程。获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 当w != 0 且 当前线程是持有锁的线程(写锁重入或者同一个线程进行writeLock->readLock->writeLock),进行下面操作
// 如果重入读锁的次数超过限制,抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置同步状态为写锁获重入次数
setState(c + acquires);
return true;
}
// 当同步状态为0时,表示读锁或者写锁都未被持有
// 如果写线程应该被阻塞 或者 CAS设置同步状态失败,则返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 申请写锁成功,设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
addWaiter与acquireQueued源码解析与上篇文章ReentrantLock独占锁相同,参考上篇文章即可。
2.2 写锁的释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
release与上篇文章ReentrantLock独占锁大体相同,参考上篇文章即可。
3 ReentrantReadWriteLock.ReadLock
读锁是一个支持重入的共享锁,同一时刻它能被多个线程同时持有,在没有其它写线程访问时,读锁总会被申请成功。如果当前线程已经持有了读锁,则增加读状态即可。
- 如果当前线程在申请读锁时,写锁已经被其它线程持有,则进入等待状态。
- 如果当前线程在申请读锁时,写锁已经被当前线程持有(锁降级)也是可以申请读锁成功。
阅读申请读锁的源码前。先考虑下,当读锁重入时,怎么存储每个线程读锁的重入次数呢?
定义了线程本地变量相关的属性如下:
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
上述这4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽量减少直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。
3.1 读锁的获取
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.3.1 tryAcquireShared
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 如果写锁状态不为0,则表示存在写锁。且当前线程不是持有写锁的线程,则申请读锁失败。
// 如果写锁状态不为0,则表示存在写锁。且当前线程是持有写锁的线程(同一个线程先持有写锁,再申请读锁->锁降级)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 写锁状态为0 或者 锁降级 -> 会走到此处
int r = sharedCount(c);
/**
* 1. 判断读线程是否应该被阻塞
* 2. 判断读状态是否超过最大值
* 3. CAS设置同步状态释放成功(CAS失败场景-> 并发时,多个线程同时申请读锁)
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 获取读锁的第一个线程
if (r == 0) {
// firstReader:获取读锁的第一个线程
firstReader = current;
// firstReaderHoldCount:第一个读线程持有读锁的重入次数
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程为第一个读线程 -> 第一个读线程的重入次数+1
firstReaderHoldCount++;
} else {
// 每一个读线程持有读锁的重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 线程第一次申请读锁,或者之前持有读锁后锁重入。从ThreadLocal中获取重入次数并更新
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 读锁需要阻塞readerShouldBlock为True,当前线程持有写锁再次申请读锁时,并且有其他写锁在当前线程的下一个节点等待。如果是这种情况,除非当前
// 线程占有锁的下个线程取消,否则进入fullTryAcquireShared方法也无法获取锁。
// CAS失败(即申请读锁失败)
return fullTryAcquireShared(current);
}
3.2.2 readerShouldBlock
作用:判断读线程是否被阻塞
tryAcquireShared中的线程执行到此处去申请读锁时,有其他写线程去申请写锁的情形如下。
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT))
- 读线程申请读锁执行到此处时,即使上一步判断出没有其他线程持有写锁。由于并发问题有一个线程去申请写锁并且成功,导致读线程下一步执行的时候。readerShouldBlock为false但是此时CAS失败进行fullTryAcquireShared,还会再次判断是否有其他线程持有写锁。返回-1
- 读线程申请读锁执行到此处时,两个写线程去申请写锁,有一个申请写锁成功,另一个在排队。此时AQS 对列为 head->tail ; A->B此时readerShouldBlock为true,进行fullTryAcquireShared,还会再次判断是否有其他线程持有写锁。返回-1
- 当前线程已经持有写锁再去申请读锁执行到此处时(锁降级),此时另外一个线程申请写锁失败,会加入AQS队列中。head->tail ; A->B此时readerShouldBlock为true进行fullTryAcquireShared尝试去申请读锁。
- 当前线程已经持有了读锁,再次重入或者其他线程申请读锁执行到此处时,已经有其他线程申请写锁并失败了。AQS队列中。head->tail ; A->B此时readerShouldBlock为true进行fullTryAcquireShared尝试去申请读锁。
- 当是公平锁时,readerShouldBlock()返回true的条件是当前线程锁在的节点有前驱节点。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
当是非公平锁时,readerShouldBlock()返回true的条件是同步队列中,头结点的下一个节点是独占节点,即为写锁等待。
该方法如果头节点不为空,并头节点的下一个节点不为空,并且不是共享模式【独占模式,写锁】、并且线程不为空。则返回true,说明有当前申请读锁的线程占有写锁,并有其他写锁在申请。为什么要判断head节点的下一个节点不为空,或是thread不为空呢?因为第一个节点head节点是当前持有写锁的线程,也就是当前申请读锁的线程,这里,也就是锁降级的关键所在,如果占有的写锁不是当前线程,那线程申请读锁会直接失败。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
3.3.3 fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
// 当前线程持有锁计数的变量
HoldCounter rh = null;
for (;;) {
// 获取同步状态
int c = getState();
// 再次判断如果写状态不为0,且当前线程不是持有写锁的线程,则申请读锁失败。直接返回-1,需要排队去申请读锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
// 读线程应该被阻塞
else if (readerShouldBlock()) {
// 当前线程已经持有了读锁,再次重入或者其他线程申请读锁执行到此处时,有其他线程申请写锁并失败
// 当前是第一个持有读锁的线程时,读锁重入不需要等待
if (firstReader == current) {
}
// 申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,
// 因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁
else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 写锁状态为0,或者当前线程持有写锁后在申请读锁(锁降级),且线程也不需要被阻塞,说明可以申请读锁
// 当前读锁状态值已经改为最大值,则抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS方式设置读锁状态值,设置成功,读锁申请成功。设置失败自旋,重新从方法头开始
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 设置线程持有读锁的重入次数。放入线程本地变量中,方便在整个请求上下文(请求锁、释放锁等过程中)使用持有锁次数。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
3.3.4 doAcquireShared
private void doAcquireShared(int arg) {
// 在队列尾部增加一个节点,锁模式为共享模式(具体参见ReentrantLock的addWaiter的方法解析)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋重试
for (;;) {
// 当前节点的前驱结点
final Node p = node.predecessor();
// 前驱结点为头结点
if (p == head) {
// 尝试申请读锁
int r = tryAcquireShared(arg);
// 申请读锁成功
if (r >= 0) {
// 持有读锁,设置当前结点为Head结点。并唤醒其他等待读锁线程的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 线程阻塞,等待被唤醒。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3.5 setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置当前结点为头结点
setHead(node);
// 如果读锁申请成功, 或头节点为空,或头节点取消,或刚持有读锁线程的下一个节点为空
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 就是只要获取成功到读锁,那就要传播到下一个节点
//(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。
// 具体请看doReleaseShared方法
if (s == null || s.isShared())
doReleaseShared();
}
}
3.3.6 doReleaseShared
唤醒当前读锁线程的下一个线程(有可能是申请读锁的线程也有可能是申请写锁的线程)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果节点状态为 Node.SIGNAL
if (ws == Node.SIGNAL) {
// 将状态设置为0失败(可能该节点被取消;还有一种情况就是有多个线程在运行该代码段)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 将状态设置为0 成功唤醒下一个读锁线程
unparkSuccessor(h);
}
// 如果状态为0,则设置为Node.PROPAGATE,设置为传播。(读锁链的最后一个结点)
// 在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,为了保险起见,会将前置节点状态设置为Node.SIGNAL, // 然后再次判断,是否需要阻塞
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法。
// 发生变化的情况:unparkSuccessor当前线程的下一个线程获取读锁成功。改变head结点
if (h == head) // loop if head changed
break;
}
}
3.2 读锁的释放
public final boolean releaseShared(int arg) {
// 释放持有的读锁
if (tryReleaseShared(arg)) {
// 持有读锁的线程被全部释放,唤醒写一个申请写锁的线程
doReleaseShared();
return true;
}
return false;
}
3.2.1 tryReleaseShared
主要是将当前线程所持有的锁的数量信息得到(从firstReader或cachedHoldCounter,或readHolds中获取 ),然后将数量减少1,如果持有数为1,则直接将该线程变量从readHolds ThreadLocal变量中移除。
/**
*1、多个线程持有读锁(A,B,C线程,也是持有读锁的顺序)
*2、第一个持有读锁线程A释放锁
*3、最后一个持有读锁的线程C释放锁
*4、其他持有读锁的线程B释放锁
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 判断当前线程是否是一个持有读锁的线程(A)
if (firstReader == current) {
// 如果第一个持有读锁线程的获取读锁的计数为1,则设置第一个获取读锁的线程为null
if (firstReaderHoldCount == 1)
firstReader = null;
// 第一个持有读锁线程存在锁重入情况,锁重入次数减1
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// B线程释放锁会走这个分支,rh代表是C线程持有读锁的次数信息
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 获取读锁的重入次数 <= 1,则将当前线程的HoldCounter从readHolds中移除
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 当前线程读锁的重入次数减一
--rh.count;
}
// 这里使用死循环的方式,确保当前线程的读锁能够释放,在释放锁阶段,只有当所有的读锁被释放,才会去执行doReleaseShared方法,才会唤醒写锁。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
4 读写锁释放获取
image-20200928230633815A线程持有写锁,B、C线程获取读锁等待写锁的释放,D线程申请写锁等待。
-
A线程释放写锁,此时新加入的线程E申请获取读锁。并发情况下E线程在B线程唤醒之前获取读锁。头结点A线程释放写锁通过unpark唤醒B节点,同时B节点也会申请锁成功。setHeadAndPropagate也会唤醒C线程。同步队列会变为如下所示
image-20200928231509553 -
A线程释放写锁,头结点A释放写锁后通过unpark唤醒B节点,同时B节点也会申请锁成功。setHeadAndPropagate也会唤醒C线程。同步队列会变为如下所示
image-20200928231509553 -
A线程持有写锁释放后,唤醒读线程申请锁B、C。当B、C释放读锁后,会唤醒申请写锁的D线程。读锁释放tryReleaseShared,会判断持有读锁的线程是否完全释放,当读锁完全释放会调用doReleaseShared来唤醒写锁。(当C先释放读锁,B后释放读锁)一样都会执行doReleaseShared来唤醒写锁
image-20200928232956889 -
A线程持有写锁释放后,此时新加入的线程E申请获取写锁。并发情况下E线程在B线程唤醒之前获取写锁,头结点A线程释放写锁通过unpark唤醒B节点,此时
B线程申请锁失败。
image-20200928233628944 -
什么时候执行doReleaseShared中的compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
A线程持有写锁释放后,唤醒B线程通过读锁的传播唤醒C线程继续唤醒D线程尝试申请写锁,申请失败继续park。此时变化如下图
image-20200929001054863
网友评论