美文网首页
J.U.C源码阅读之ReentrantReadWriteLock

J.U.C源码阅读之ReentrantReadWriteLock

作者: Stone_soul | 来源:发表于2017-01-04 22:19 被阅读0次

一、框架图:

框架图

从中可以看出:

(01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。

(02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象",它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。

(03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认是"NonfairSync"。

二、介绍

ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。

“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。

“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。

注意:不能同时存在读取锁和写入锁!

ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。

三、特点

复用的state值

state表示持有锁的数量,因为ReentrantReadWriteLock分为“读锁”和“写锁”两把锁,所以它把低4位用来表示“写锁(独占锁)”的持有数,其他位数表示“读锁(共享锁)”的持有数。

代码:

/*

* Read vs write count extraction constants and functions.

* Lock state is logically divided into two unsigned shorts:

* The lower one representing the exclusive (writer) lock hold count,

* and the upper the shared (reader) hold count.

*/

static final intSHARED_SHIFT=16;

static final intSHARED_UNIT= (1<

static final intMAX_COUNT= (1<

static final intEXCLUSIVE_MASK= (1<

/** Returns the number of shared holds represented in count  */

static intsharedCount(intc)    {returnc >>>SHARED_SHIFT;}

/** Returns the number of exclusive holds represented in count  */

static intexclusiveCount(intc) {returnc &EXCLUSIVE_MASK;}

ThreadLocal变量,保持线程和获取“共享锁”的次数

代码

static final classThreadLocalHoldCounter

extendsThreadLocal {

publicHoldCounterinitialValue() {

return newHoldCounter();

}

}

/**

* The number of reentrant read locks held by current thread.

* Initialized only in constructor and readObject.

* Removed whenever a thread's read hold count drops to 0.

*/

private transientThreadLocalHoldCounterreadHolds;

四、源码分析

还是从lock()开始分析:

ReadLock中的lock():

public voidlock() {

sync.acquireShared(1);

}

AQS中的acquireShared():

public final voidacquireShared(intarg) {

if(tryAcquireShared(arg) <0)

doAcquireShared(arg);

}

ReentrantReadWriteLock中的tryAcquireShared(arg)

protected final inttryAcquireShared(intunused) {

/*

* Walkthrough:

* 1. If write lock held by another thread, fail.

* 2. Otherwise, this thread is eligible for

*    lock wrt state, so ask if it should block

*    because of queue policy. If not, try

*    to grant by CASing state and updating count.

*    Note that step does not check for reentrant

*    acquires, which is postponed to full version

*    to avoid having to check hold count in

*    the more typical non-reentrant case.

* 3. If step 2 fails either because thread

*    apparently not eligible or CAS fails or count

*    saturated, chain to version with full retry loop.

*/

Thread current = Thread.currentThread();

intc = getState();

if(exclusiveCount(c) !=0&&       //如果是独占锁,而且不是当前线程,直接返回-1

getExclusiveOwnerThread() != current)

return-1;

intr =sharedCount(c);

if(!readerShouldBlock() &&        //如果不需要“阻塞等待”,而且“读取锁”的计数小于“MAX_COUNT”,“读取锁”的状态+1

compareAndSetState(c,c +SHARED_UNIT)) {

if(r ==0) {

firstReader= current;

firstReaderHoldCount=1;

}else if(firstReader== current) {

firstReaderHoldCount++;

}else{

HoldCounter rh =cachedHoldCounter;      //readHolds是一个ThreadLocal,记录当前线程的获取“读取锁”的次数,cacheHoldCounter是当前线程的缓存,避免每次都从ThreadLocal中拿。这个次数当然要+1

if(rh ==null|| rh.tid!=getThreadId(current))

cachedHoldCounter= rh =readHolds.get();

else if(rh.count==0)

readHolds.set(rh);

rh.count++;

}

return1;

}

returnfullTryAcquireShared(current);

}

说明:tryAcquireShared()的作用是尝试获取“共享锁”。

如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。

否则,通过fullTryAcquireShared()获取读取锁。

ReentrantReadWriteLock中的fullTryAcquireShared(thread)

final intfullTryAcquireShared(Thread current) {

/*

* This code is in part redundant with that in

* tryAcquireShared but is simpler overall by not

* complicating tryAcquireShared with interactions between

* retries and lazily reading hold counts.

*/

HoldCounter rh =null;

for(;;) {

intc = getState();

if(exclusiveCount(c) !=0) {//如果是独占锁,且当前线程不是“独占锁”的持有者,则什么都不干,直接返回-1

if(getExclusiveOwnerThread() != current)

return-1;

//如果“需要阻塞等待”。

//(01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。

//(02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。

}else if(readerShouldBlock()) {

// Make sure we're not acquiring read lock reentrantly

if(firstReader== current) {

// assert firstReaderHoldCount > 0;

}else{

if(rh ==null) {

rh =cachedHoldCounter;

if(rh ==null|| rh.tid!=getThreadId(current)) {

rh =readHolds.get();

if(rh.count==0)

readHolds.remove();

}

}

if(rh.count==0)

return-1;

}

}

//如果共享统计数超过MAX_COUNT,则抛出异常。

if(sharedCount(c) ==MAX_COUNT)

throw newError("Maximum lock count exceeded");

if(compareAndSetState(c,c +SHARED_UNIT)) {

if(sharedCount(c) ==0) {//更新state值,如果还没有线程获得“读锁”,那么就把当前线程更新为firstReader,firstReaderHoldCounter值设为1

firstReader= current;

firstReaderHoldCount=1;

}else if(firstReader== current) {//如果当前线程就是锁的持有者,firstReadHoldCounter+1

firstReaderHoldCount++;

}else{//都不满足的话,从缓存或者LocalHold中获取当前线程获得“读锁”的次数,+1

if(rh ==null)

rh =cachedHoldCounter;

if(rh ==null|| rh.tid!=getThreadId(current))

rh =readHolds.get();

else if(rh.count==0)

readHolds.set(rh);

rh.count++;

cachedHoldCounter= rh;// cache for release

}

return1;

}

}

}

说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

doAcquireShared()定义在AQS函数中

privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//获取“node”的前一节点finalNode p =node.predecessor();//如果“当前线程”是CLH队列的表头,则尝试获取共享锁。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {

setHeadAndPropagate(node, r);

p.next=null;//help GCif(interrupted)

selfInterrupt();

failed=false;return;

}

}//如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,//需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

interrupted=true;

}

}finally{if(failed)

cancelAcquire(node);

}

}

说明:doAcquireShared()的作用是获取共享锁。

它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。

如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。

doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

相关文章

网友评论

      本文标题:J.U.C源码阅读之ReentrantReadWriteLock

      本文链接:https://www.haomeiwen.com/subject/ubeavttx.html