美文网首页
J.U.C源码阅读之ReentrantReadWriteLock

J.U.C源码阅读之ReentrantReadWriteLock

作者: Stone_soul | 来源:发表于2017-01-04 22:19 被阅读0次

    一、框架图:

    框架图

    从中可以看出:

    (01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。

    (02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象",它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。

    (03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认是"NonfairSync"。

    二、介绍

    ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。

    “读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。

    “写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。

    注意:不能同时存在读取锁和写入锁!

    ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。

    三、特点

    复用的state值

    state表示持有锁的数量,因为ReentrantReadWriteLock分为“读锁”和“写锁”两把锁,所以它把低4位用来表示“写锁(独占锁)”的持有数,其他位数表示“读锁(共享锁)”的持有数。

    代码:

    /*

    * Read vs write count extraction constants and functions.

    * Lock state is logically divided into two unsigned shorts:

    * The lower one representing the exclusive (writer) lock hold count,

    * and the upper the shared (reader) hold count.

    */

    static final intSHARED_SHIFT=16;

    static final intSHARED_UNIT= (1<

    static final intMAX_COUNT= (1<

    static final intEXCLUSIVE_MASK= (1<

    /** Returns the number of shared holds represented in count  */

    static intsharedCount(intc)    {returnc >>>SHARED_SHIFT;}

    /** Returns the number of exclusive holds represented in count  */

    static intexclusiveCount(intc) {returnc &EXCLUSIVE_MASK;}

    ThreadLocal变量,保持线程和获取“共享锁”的次数

    代码

    static final classThreadLocalHoldCounter

    extendsThreadLocal {

    publicHoldCounterinitialValue() {

    return newHoldCounter();

    }

    }

    /**

    * The number of reentrant read locks held by current thread.

    * Initialized only in constructor and readObject.

    * Removed whenever a thread's read hold count drops to 0.

    */

    private transientThreadLocalHoldCounterreadHolds;

    四、源码分析

    还是从lock()开始分析:

    ReadLock中的lock():

    public voidlock() {

    sync.acquireShared(1);

    }

    AQS中的acquireShared():

    public final voidacquireShared(intarg) {

    if(tryAcquireShared(arg) <0)

    doAcquireShared(arg);

    }

    ReentrantReadWriteLock中的tryAcquireShared(arg)

    protected final inttryAcquireShared(intunused) {

    /*

    * Walkthrough:

    * 1. If write lock held by another thread, fail.

    * 2. Otherwise, this thread is eligible for

    *    lock wrt state, so ask if it should block

    *    because of queue policy. If not, try

    *    to grant by CASing state and updating count.

    *    Note that step does not check for reentrant

    *    acquires, which is postponed to full version

    *    to avoid having to check hold count in

    *    the more typical non-reentrant case.

    * 3. If step 2 fails either because thread

    *    apparently not eligible or CAS fails or count

    *    saturated, chain to version with full retry loop.

    */

    Thread current = Thread.currentThread();

    intc = getState();

    if(exclusiveCount(c) !=0&&       //如果是独占锁,而且不是当前线程,直接返回-1

    getExclusiveOwnerThread() != current)

    return-1;

    intr =sharedCount(c);

    if(!readerShouldBlock() &&        //如果不需要“阻塞等待”,而且“读取锁”的计数小于“MAX_COUNT”,“读取锁”的状态+1

    compareAndSetState(c,c +SHARED_UNIT)) {

    if(r ==0) {

    firstReader= current;

    firstReaderHoldCount=1;

    }else if(firstReader== current) {

    firstReaderHoldCount++;

    }else{

    HoldCounter rh =cachedHoldCounter;      //readHolds是一个ThreadLocal,记录当前线程的获取“读取锁”的次数,cacheHoldCounter是当前线程的缓存,避免每次都从ThreadLocal中拿。这个次数当然要+1

    if(rh ==null|| rh.tid!=getThreadId(current))

    cachedHoldCounter= rh =readHolds.get();

    else if(rh.count==0)

    readHolds.set(rh);

    rh.count++;

    }

    return1;

    }

    returnfullTryAcquireShared(current);

    }

    说明:tryAcquireShared()的作用是尝试获取“共享锁”。

    如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。

    否则,通过fullTryAcquireShared()获取读取锁。

    ReentrantReadWriteLock中的fullTryAcquireShared(thread)

    final intfullTryAcquireShared(Thread current) {

    /*

    * This code is in part redundant with that in

    * tryAcquireShared but is simpler overall by not

    * complicating tryAcquireShared with interactions between

    * retries and lazily reading hold counts.

    */

    HoldCounter rh =null;

    for(;;) {

    intc = getState();

    if(exclusiveCount(c) !=0) {//如果是独占锁,且当前线程不是“独占锁”的持有者,则什么都不干,直接返回-1

    if(getExclusiveOwnerThread() != current)

    return-1;

    //如果“需要阻塞等待”。

    //(01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。

    //(02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。

    }else if(readerShouldBlock()) {

    // Make sure we're not acquiring read lock reentrantly

    if(firstReader== current) {

    // assert firstReaderHoldCount > 0;

    }else{

    if(rh ==null) {

    rh =cachedHoldCounter;

    if(rh ==null|| rh.tid!=getThreadId(current)) {

    rh =readHolds.get();

    if(rh.count==0)

    readHolds.remove();

    }

    }

    if(rh.count==0)

    return-1;

    }

    }

    //如果共享统计数超过MAX_COUNT,则抛出异常。

    if(sharedCount(c) ==MAX_COUNT)

    throw newError("Maximum lock count exceeded");

    if(compareAndSetState(c,c +SHARED_UNIT)) {

    if(sharedCount(c) ==0) {//更新state值,如果还没有线程获得“读锁”,那么就把当前线程更新为firstReader,firstReaderHoldCounter值设为1

    firstReader= current;

    firstReaderHoldCount=1;

    }else if(firstReader== current) {//如果当前线程就是锁的持有者,firstReadHoldCounter+1

    firstReaderHoldCount++;

    }else{//都不满足的话,从缓存或者LocalHold中获取当前线程获得“读锁”的次数,+1

    if(rh ==null)

    rh =cachedHoldCounter;

    if(rh ==null|| rh.tid!=getThreadId(current))

    rh =readHolds.get();

    else if(rh.count==0)

    readHolds.set(rh);

    rh.count++;

    cachedHoldCounter= rh;// cache for release

    }

    return1;

    }

    }

    }

    说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

    doAcquireShared()定义在AQS函数中

    privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//获取“node”的前一节点finalNode p =node.predecessor();//如果“当前线程”是CLH队列的表头,则尝试获取共享锁。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {

    setHeadAndPropagate(node, r);

    p.next=null;//help GCif(interrupted)

    selfInterrupt();

    failed=false;return;

    }

    }//如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,//需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

    interrupted=true;

    }

    }finally{if(failed)

    cancelAcquire(node);

    }

    }

    说明:doAcquireShared()的作用是获取共享锁。

    它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。

    如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。

    doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

    相关文章

      网友评论

          本文标题:J.U.C源码阅读之ReentrantReadWriteLock

          本文链接:https://www.haomeiwen.com/subject/ubeavttx.html