ReentrantReadWriteLock 读写锁是一种改进的排他锁,也可以称作共享/排他锁. 允许多个线程同时读取共享数据,但是一次只允许一个线程对共享数据进行更新。
读写锁通过读锁与写锁来完成读写操作. 线程在读取共享数据前必须先持有读锁,该读锁可以同时被多个线程持有,即它是共享的.线程在修改共享数据前必须先持有写锁,写锁是排他的, 一个线程持有写锁时,其他线程无法获得相应的锁。
读锁只是在读线程之间共享,任何一个线程持有读锁时,其他线程都无法获得写锁, 保证线程在读取数据期间没有其他线程对数据进行更新,使得读线程能够读到数据的最新值,保证在读数据期间共享变量不被修改。
读写锁相互的关系如下:
获得条件 | 排他性 | 作用 | |
---|---|---|---|
读锁 | 写锁未被任意线程持有 | 对读线程是共享的,对写线程是排他的 | 允许多个读线程可以同时读取共享数据,保证在读共享数据时,没有其他线程对共享数据进行修改。 |
写锁 | 该写锁未被其他线程持有,并且相应的读锁也未被其他线程持有 | 对读线程或者写线程都是排他的 | 保证写线程以独占的方式修改共享数据。 |
先查看源码:
ReentrantReadWriteLock:
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReadLock:
以共享的方式获取锁,tryAcquireShared < 0 ,意思就是 不是小于 0 就是获得锁了,否则执行doAcquireShared()方法,可见该方法和ReentrantLock的acquireQueued() 方法是非常相似的,就是入队的操作。
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前
// 线程获取读锁失败返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//2. 当前线程获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法
//返回当前获取读锁的次数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//4. 处理在第二步中CAS操作失败的自旋已经实现重入性
return fullTryAcquireShared(current);
}
tryAcquireShared()方法
有意思的是tryAcquireShared()[ 该方法在AQS中的 ]中的方法:int r = sharedCount(c); 和 int c = getState(); exclusiveCount(c) : 看它们的源码:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
int类型是4个字节,32bit,SHARED_SHIFT=16,是个分割点,看源码得知,r变量是读锁获取的计数,使用exclusiveCount(c)计算出来的是写锁获取的计数,读锁获取计数采用高16位,写锁获取采用低16位。如图:
回归正题,在分割点,可以类比它们各自占16bit,由左向右,它们各自的起始点按平常的二进制计算即可,它们各自算出的数采用分割点的话,跟传入的数是一样的,这点无需多虑。
看看获取锁的判断条件:
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {}
- 第一步就是根据公平性质来是否顺序获取的,
- 第二步是判断锁有无被写线程占有,
- 第三步就是熟悉的cas了,
如果任一不通过,最后还是fullTryAcquireShared(),里头和判断if框柱的内容相似,无非继续自旋 ,读线程继续累加直到成功,个中采用threadLocal,这里不展开。
这里有个非常秒的点,有firstReader(Thread类型), 它代表第一个获得读锁的线程,同时配有一个firstCount计数器(int类型),非第一个获得读锁的线程,就会另外新建HoldCounter,采用的是threadLocal,每个获得读锁都配一个,对其进行计数,释放时也按数目释放。采用firstReader,是为了只有一个读线程的时候,就不需要new HoldCounter 了,也就是不需要用到threadLocal那么复杂了。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。
其中关键点:
r < MAX_COUNT 这个判断条件非常秒,因为读写是互斥的,MAX_COUNT是r高位=1时 - 1 的数值,如果r不为0,那就是大于 MAX_COUNT了,证明写锁正在占有锁,就乖乖去排队等锁。
条件 !readerShouldBlock ,注意取反,无非就是判断 该线程 要不要阻塞,分别分为公平和非公平、只有返回-1拿不到锁,才会进入doAcquireShared,还是双向队列,注意addWaiter(Node.Shared),非公平的!s.isShared() 已经不满足,直接外面判断true,直接争锁,公平还是那个熟悉的hasQueuedPredecessors(),AQS中介绍过,就是查看队列有无已经排队的,有就加入,无就争锁,这里不展开。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
writeLock
写锁是排他的,代码中的acquire和ReentrantLock非常相似,Node同样是Exclusive,tryAcquire()方法也被ReentrantReadWriteLock重写。
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
展开看看tryAcquire()以排他的方式获取锁
只讲这部分,后续其他和ReentrantLock一样,不展开。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
其中关键点:
拿低16的写锁数值,if (w + exclusiveCount(acquires) > MAX_COUNT)这句判断和读锁一样秒,不展开说,就是重入的意思。
writerShouldBlock代码:
- 非公平直接了当放回false,外面就靠compareAndSetState能不能抢的锁了,不能就去排队。
- 公平的话,还是熟悉的hasQueuedPredecessors(),队列没有前辈就抢锁,抢失败去排队,如果有前辈,直接ture,后面不用cas,直接去排队。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
当然。它们还有tryLock、tryLock(timeout, timeUnit)、tryLockInterruptibly()、unlock()。基本原理和ReentrantLock差不多,这里就不展开。
锁降级
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。
网友评论