下图是ReentrantReadWriteLock读写锁的UML类图。从图中可以清晰的看见,该类主要有读锁和写锁,以及同步器Sync组成。读锁调用 的是同步器Sync中的共享获取释放state方法tryAcquireShared,tryReleaseShared.当使用读写锁中的写锁时,调用的是tryAcquire,tryRelease,互斥的获取释放state的方法。
读写锁满足的语义:
- 写锁完成的更新,对后续的读锁可见。
- 可以由多个线程进入读锁
- 当有写锁之后,其他线程无法获取写锁和读锁
- 锁重入特性,读锁和写锁都可以重入。
- 可以从写锁,降级为读锁
image.png
从这个类的整体结构来说,ReentrantReadWriteLock聚合了读锁ReadLock和写锁WriteLock,分别实现Lock接口,写锁在获取锁资源的时候,调用内聚的Sync同步器(AQS的子类)内部的tryAcquire和tryRelease方法,去独占式的获取锁资源。读锁在获取锁资源的时候调用内聚的Sync同步器(AQS的子类)内部的tryAcquireShared和tryReleaseShared方法,去共享式的获取锁资源。
但是这两种锁资源,存在关系写锁完成的更新,对后续的读锁可见。
**
那怎么办?
只能两个锁状态,都放在一个volatile变量上,写锁调用tryAcquire和tryRelease对state进行操作完成。另外的读线程,立马可以感知到。这也是读写锁设计巧妙的地方,读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。
image.png
假设当前同步状态值为S,get和set的操作如下:
(1)获取写状态:
S&0x0000FFFF:将高16位全部抹去,也就是和EXCLUSIVE_MASK做与运算。
(2)获取读状态:
S>>>16:无符号补0,右移16位
(3)写状态加1:
S+1
(4)读状态加1:
S+(1<<16)即S + 0x00010000 即S+SHARED_UNIT
接下来看源码分析:
写锁的获取和释放
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); //获取写状态位
if (c != 0) {
if (w == 0 || //如果c!=0&&w==0 ,直接return false。 因为读锁状态位!=0,也就是
//说有其他线程占用读锁。那么这时候,对共享资源修改,其它读线程可能无法在写线程结束
//时,感知到写线程对共享资源的修改
current != getExclusiveOwnerThread()) //另外一种情况,假如说w!=0,
//current != getExclusiveOwnerThread() ,不是当前线程,写锁是互斥的,所以return false
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 最后锁重入
setState(c + acquires);
return true;
}
//走到这里说明c=0 ,这个时候,可能同步队列中还有线程需要抢占锁资源
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);//最后cas设置同步状态成功,把当前线程设置成独占线程
return true;
}
writerShouldBlock判断是否需要阻塞。公平和非公平方式实现不同,在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
FairSync公平模式
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
NonfairSync非公平模式直接返回false
final boolean writerShouldBlock() {
return false; // writers can always barge
}
protected final boolean tryRelease(int releases) {
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
就是state减去入参,就好了,和ReentrantLock很相似。
读锁的获取和释放
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) //如果写锁线程数 != 0 ,且独占锁不是当前线程则返
//回失败,因为存在锁降级
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { ///r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++; //// 当前线程为第一个读线程,表示第一个读锁线程重入
} else {
HoldCounter rh = cachedHoldCounter; //cachedHoldCounter是上一个线程对应的重入次数
//第三个线程进来的时候,rh ,cachedHoldCounter存的是第二个线程的HoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); //readHolds是个ThreadLocal变量
//第二个读线程进来的时候,把当前的HoldCounter对象放入cachedHoldCounter
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
当compareAndSetState(c, c + SHARED_UNIT)失败的时候调用,以下方法
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { //// 无限循环
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的释放,tryReleaseShared方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { //// 当前线程为第一个读线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter; // 获取缓存的计数器
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 无限循环
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
网友评论