一、框架图:
框架图从中可以看出:
(01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。
(02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象",它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。
(03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认是"NonfairSync"。
二、介绍
ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。
“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
注意:不能同时存在读取锁和写入锁!
ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。
三、特点
复用的state值
state表示持有锁的数量,因为ReentrantReadWriteLock分为“读锁”和“写锁”两把锁,所以它把低4位用来表示“写锁(独占锁)”的持有数,其他位数表示“读锁(共享锁)”的持有数。
代码:
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final intSHARED_SHIFT=16;
static final intSHARED_UNIT= (1<
static final intMAX_COUNT= (1<
static final intEXCLUSIVE_MASK= (1<
/** Returns the number of shared holds represented in count */
static intsharedCount(intc) {returnc >>>SHARED_SHIFT;}
/** Returns the number of exclusive holds represented in count */
static intexclusiveCount(intc) {returnc &EXCLUSIVE_MASK;}
ThreadLocal变量,保持线程和获取“共享锁”的次数
代码
static final classThreadLocalHoldCounter
extendsThreadLocal {
publicHoldCounterinitialValue() {
return newHoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transientThreadLocalHoldCounterreadHolds;
四、源码分析
还是从lock()开始分析:
ReadLock中的lock():
public voidlock() {
sync.acquireShared(1);
}
AQS中的acquireShared():
public final voidacquireShared(intarg) {
if(tryAcquireShared(arg) <0)
doAcquireShared(arg);
}
ReentrantReadWriteLock中的tryAcquireShared(arg)
protected final inttryAcquireShared(intunused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
intc = getState();
if(exclusiveCount(c) !=0&& //如果是独占锁,而且不是当前线程,直接返回-1
getExclusiveOwnerThread() != current)
return-1;
intr =sharedCount(c);
if(!readerShouldBlock() && //如果不需要“阻塞等待”,而且“读取锁”的计数小于“MAX_COUNT”,“读取锁”的状态+1
r
compareAndSetState(c,c +SHARED_UNIT)) {
if(r ==0) {
firstReader= current;
firstReaderHoldCount=1;
}else if(firstReader== current) {
firstReaderHoldCount++;
}else{
HoldCounter rh =cachedHoldCounter; //readHolds是一个ThreadLocal,记录当前线程的获取“读取锁”的次数,cacheHoldCounter是当前线程的缓存,避免每次都从ThreadLocal中拿。这个次数当然要+1
if(rh ==null|| rh.tid!=getThreadId(current))
cachedHoldCounter= rh =readHolds.get();
else if(rh.count==0)
readHolds.set(rh);
rh.count++;
}
return1;
}
returnfullTryAcquireShared(current);
}
说明:tryAcquireShared()的作用是尝试获取“共享锁”。
如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。
否则,通过fullTryAcquireShared()获取读取锁。
ReentrantReadWriteLock中的fullTryAcquireShared(thread)
final intfullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh =null;
for(;;) {
intc = getState();
if(exclusiveCount(c) !=0) {//如果是独占锁,且当前线程不是“独占锁”的持有者,则什么都不干,直接返回-1
if(getExclusiveOwnerThread() != current)
return-1;
//如果“需要阻塞等待”。
//(01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。
//(02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。
}else if(readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if(firstReader== current) {
// assert firstReaderHoldCount > 0;
}else{
if(rh ==null) {
rh =cachedHoldCounter;
if(rh ==null|| rh.tid!=getThreadId(current)) {
rh =readHolds.get();
if(rh.count==0)
readHolds.remove();
}
}
if(rh.count==0)
return-1;
}
}
//如果共享统计数超过MAX_COUNT,则抛出异常。
if(sharedCount(c) ==MAX_COUNT)
throw newError("Maximum lock count exceeded");
if(compareAndSetState(c,c +SHARED_UNIT)) {
if(sharedCount(c) ==0) {//更新state值,如果还没有线程获得“读锁”,那么就把当前线程更新为firstReader,firstReaderHoldCounter值设为1
firstReader= current;
firstReaderHoldCount=1;
}else if(firstReader== current) {//如果当前线程就是锁的持有者,firstReadHoldCounter+1
firstReaderHoldCount++;
}else{//都不满足的话,从缓存或者LocalHold中获取当前线程获得“读锁”的次数,+1
if(rh ==null)
rh =cachedHoldCounter;
if(rh ==null|| rh.tid!=getThreadId(current))
rh =readHolds.get();
else if(rh.count==0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter= rh;// cache for release
}
return1;
}
}
}
说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。
doAcquireShared()定义在AQS函数中
privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//获取“node”的前一节点finalNode p =node.predecessor();//如果“当前线程”是CLH队列的表头,则尝试获取共享锁。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {
setHeadAndPropagate(node, r);
p.next=null;//help GCif(interrupted)
selfInterrupt();
failed=false;return;
}
}//如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,//需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted=true;
}
}finally{if(failed)
cancelAcquire(node);
}
}
说明:doAcquireShared()的作用是获取共享锁。
它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!
网友评论