美文网首页
JUC原理之ReentrantReadWriteLock

JUC原理之ReentrantReadWriteLock

作者: AlienPaul | 来源:发表于2020-03-18 10:40 被阅读0次

    什么是ReentrantReadWriteLock

    ReentrantReadWriteLockReentrantLock同样是重入锁,支持公平方式和非公平方式获取锁。

    ReentrantReadWriteLock同时支持共享锁和独占锁。对于共享资源的读写操作而言,我们允许多个线程同时读取,但是同一时刻仅允许一个线程写入,并且写入操作进行的时候其他线程无法读取。ReentrantReadWriteLock正是这样的一种锁,内部维护了两个锁:读锁和写锁。如下所示:

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    

    其中读锁是共享锁,写锁是独占锁。

    实现方式

    ReentrantReadWriteLock使用同一个AQS队列来处理共享锁和独占锁的请求。

    通过上一篇JUC原理之ReentrantLock我们知道AQS的state表示某一线程的重入深度。但是在ReentrantReadWriteLock中仍然这样记录是不能满足要求的,原因如下:

    • ReentrantReadWriteLock同时有读锁和写锁两种,那么这两种锁当前持有的个数都要记录
    • 读锁是共享锁,可以被多个线程同时持有,但又是可重入锁。这就意味着每个持有读锁的线程的重入深度也需要记录

    下面将在代码的分析中逐个说明如何解决上述的两个问题。

    为了解决同时存放读锁和写锁个数的问题,ReentrantReadWriteLock的同步状态(state)这个整型变量分为两部分:

    • 高16位用于存放共享锁的持有数量
    • 低16位用于存放独占锁的持有数量

    如下方代码所示(摘自Sync类):

    // 共享数量偏移量,使用state的高16位存放,自然偏移16
    static final int SHARED_SHIFT   = 16;
    // 共享锁数量的1在state中对应着是几,当然是高16位的最低位+1了
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 共享锁或独占锁的最大持有数量,state变量分成了2部分使用,每种所的持有最大量变为2的16次方-1
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 返回一个低16位全为1的整型
    // 和子网掩码用法类似,state和EXCLUSIVE_MASK按位与,可以得到当前独占锁持有的数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    /** Returns the number of shared holds represented in count  */
    // 获取当前持有共享锁总数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** Returns the number of exclusive holds represented in count  */
    // 获取当前持有的独占锁数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    获取共享锁的过程

    注意一种特殊情况:允许同一个线程先获取独占锁,没有释放的时候又获取共享锁。

    我们分析下tryAcquireShared方法的实现:

    protected final int tryAcquireShared(int unused) {
        /*
         * Walkthrough:
         * 1. If write lock held by another thread, fail.
         * 2. Otherwise, this thread is eligible for
         *    lock wrt state, so ask if it should block
         *    because of queue policy. If not, try
         *    to grant by CASing state and updating count.
         *    Note that step does not check for reentrant
         *    acquires, which is postponed to full version
         *    to avoid having to check hold count in
         *    the more typical non-reentrant case.
         * 3. If step 2 fails either because thread
         *    apparently not eligible or CAS fails or count
         *    saturated, chain to version with full retry loop.
         */
        
        // 获取当前线程
        Thread current = Thread.currentThread();
        int c = getState();
        // 如果有线程持有独占锁,并且独占线程不是当前线程,获取锁失败
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
            
        // 获取共享锁数量
        int r = sharedCount(c);
        // 判断是否可以获取读锁(共享锁),这个方法在公平锁和非公平锁中的实现不同,稍后分析
        // r值必须小于MAX_COUNT,否则会溢出
        // CAS尝试读锁个数增加1(高16位)
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果目前没有线程得到共享锁
            if (r == 0) {
                // 设置firstReader(第一个读线程)为当前线程
                firstReader = current;
                // 设置firstReader的重入数量为1
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 这个分支含义为第一个读线程为当前线程,并且它再次尝试获得锁(重入)
                firstReaderHoldCount++;
            } else {
                // 这里要更新线程持有的读锁数目(线程的重入深度)
                // HoldCounter是一个线程持有锁数量的计数器,保存了线程id和锁计数两个变量
                // cachedHoldCounter是最后一个获取共享锁的线程对应的HoldCounter,通常来说释放锁的线程就是上一个获取锁的线程。把这个线程的HoldCounter缓存可以减少ThreadLocal查找的时间
                HoldCounter rh = cachedHoldCounter;
                // 如果rh为空,或者rh的线程ID不是当前的线程ID,更新cachedHoldCounter为当前线程对应的HoldCounter
                // readHolds变量在Sync构造函数中创建,线程第一次访问的时候返当前线程对应的HoldCounter,count值为0
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                    
                // 如果rh为当前线程的cache,并且count为0(线程第一次获取共享锁)
                // 记录当前线程的HoldCounter到ThreadLocal
                else if (rh.count == 0)
                    readHolds.set(rh);
                
                当前线程的共享锁数量加1
                rh.count++;
            }
            // 此处获取读锁成功,返回1
            return 1;
        }
        // 如果上面的if条件没有满足,需要走完整的获取共享锁流程
        // 上面的步骤只是获取共享锁的快速方法
        return fullTryAcquireShared(current);
    }
    

    注意:这里有三个变量:firstReader, firstReaderHoldCountcachedHoldCounter。他们的作用如下:

    • firstReader, firstReaderHoldCount只存储第一个获取到共享锁的线程和重入深度,cachedHoldCounter是最近一次获取到共享锁的线程HoldCounter的缓存。
    • 大多数情况而言没有锁竞争,只有一个线程获取锁和释放锁,此时只需要使用firstReader, firstReaderHoldCount即可保存线程的重入次数,无需使用readHolds这个ThreadLocal类型,提高了运行效率。
    • 另一种常见的场景是具有锁竞争的环境,一个线程获取了锁,处理完业务后会紧接着释放锁。此时使用cachedHoldCounter可以避免查找ThreadLocalMap,同样提高了代码运行效率。

    HoldCounter类用于保存每个线程持有的共享锁数量,需要在ThreadLocal中使用。代码如下:

    /**
     * A counter for per-thread read hold counts.
     * Maintained as a ThreadLocal; cached in cachedHoldCounter
     */
    static final class HoldCounter {
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }
    

    readHolds保存了每一个线程的共享锁数量,它是ThreadLocalHolderCounter类型。它继承了ThreadLocal类,代码如下:

    /**
     * ThreadLocal subclass. Easiest to explicitly define for sake
     * of deserialization mechanics.
     */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    

    Sync初始化的时候readHolds变量也一同被初始化。线程第一次调用readHolds.get()时候,自动调用initialValue方法,创建并返回一个当前线程对应的HoldCounter

    接下来分析完整版本的获取共享锁方法fullTryAcquireShared,多用于处理CompareAndSetState失败时候的逻辑(tryAcquireSharedCompareAndSetState失败后会执行此方法)。

    /**
     * Full version of acquire for reads, that handles CAS misses
     * and reentrant reads not dealt with in tryAcquireShared.
     */
    final int fullTryAcquireShared(Thread current) {
        /*
         * This code is in part redundant with that in
         * tryAcquireShared but is simpler overall by not
         * complicating tryAcquireShared with interactions between
         * retries and lazily reading hold counts.
         */
        HoldCounter rh = null;
        // 自旋
        for (;;) {
            // 获取当前状态
            int c = getState();
            // 如果独占锁的持有数量不是0
            if (exclusiveCount(c) != 0) {
                // 检查持有独占锁的线程是不是当前线程,如果不是返回-1,获取锁失败
                // 此处如果持有独占锁的线程是当前线程,方法不会返回
                // 意思是允许同一个线程获取独占锁之后又获取共享锁
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // 如果不允许获取共享锁
                // 字面上是这个意思,但逻辑上的含义为如果readerShouldBlock()返回true,只允许重入,不允许新线程获得锁
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    // 如果firstReader不是当前线程
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        // 如果cachedHoldCounter为空或者是cachedHoldCounter的HoldCounter和当前线程不对应
                        if (rh == null || rh.tid != getThreadId(current)) {
                            // 获取当前线程的HoldCounter
                            rh = readHolds.get();
                            // 如果count为0,从readHolds清除
                            // 该线程已不持有共享锁,不需要再记录该线程的共享锁数量
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // 如果rh的count为0,说明当前线程没有获得共享锁,获取失败返回-1
                    // 如果rh的count不为0,说明当前线程持有共享锁,继续向下进行,允许再次获取共享锁
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 如果共享锁数量为最大值,抛出异常
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // CAS增加共享锁持有数量
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果成功,后面的逻辑和tryAcquireShared方法一致,不再赘述
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    

    获取独占锁的过程

    获取独占锁的入口为tryAcquire方法,代码如下:

    protected final boolean tryAcquire(int acquires) {
        /*
         * Walkthrough:
         * 1. If read count nonzero or write count nonzero
         *    and owner is a different thread, fail.
         * 2. If count would saturate, fail. (This can only
         *    happen if count is already nonzero.)
         * 3. Otherwise, this thread is eligible for lock if
         *    it is either a reentrant acquire or
         *    queue policy allows it. If so, update state
         *    and set owner.
         */
         
        // 获取当前线程
        Thread current = Thread.currentThread();
        int c = getState();
        // 获取独占锁数量
        int w = exclusiveCount(c);
        // 如果有独占锁或者共享锁
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 如果独占锁数量为0,说明有共享锁,获取独占锁失败,返回false
            // 如果独占锁数量不为0,持有锁的线程不是当前线程,说明其他线程持有独占锁。本线程获取独占锁失败,返回false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 如果获取独占锁之后,独占锁计数大于MAX_COUNT,说明独占锁计数溢出
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 更新独占锁数量
            setState(c + acquires);
            // 获取独占锁成功
            return true;
        }
        // 如果没有任何锁
        // 允许当前线程获取独占锁
        // 如果CAS设置state失败,代表获取锁失败,返回false
        // 如果不允许当前线程获取独占锁,直接返回false
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        // 如果成功,设置独占线程为当前线程,返回true
        setExclusiveOwnerThread(current);
        return true;
    }
    

    释放共享锁的过程

    释放共享锁的方法为tryReleaseShared,分析如下:

    protected final boolean tryReleaseShared(int unused) {
        // 获取当前线程
        Thread current = Thread.currentThread();
        // 如果当前线程是firstReader
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            // 如果firstReader持有锁数量为1,设置firstReader为null
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                // 否则firstReader持有锁数量减1
                firstReaderHoldCount--;
        } else {
            // 获得当前线程对应的HoldCounter
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            // 如果count <=0,需要从readHolds变量中移除此HoldCounter
            // 意味着该线程不再持有共享锁,无需再记录重入次数
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            // 重入数量减1
            --rh.count;
        }
        // 死循环
        for (;;) {
            // 反复尝试CAS将state共享锁数量减1,成功后返回
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                // 返回结果为共享锁是否全部释放
                return nextc == 0;
        }
    }
    

    释放独占锁的方法

    释放独占锁的方法为tryRelease,分析如下:

    protected final boolean tryRelease(int releases) {
        // 如果持有独占锁的线程不是当前线程,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 计算新的同步状态
        int nextc = getState() - releases;
        // 判断独占锁是否全部释放
        boolean free = exclusiveCount(nextc) == 0;
        // 如果释放完后不再有独占锁,设置独占线程为null
        if (free)
            setExclusiveOwnerThread(null);
        // 设置新的同步状态
        setState(nextc);
        // 返回独占锁是否全部释放
        return free;
    }
    

    FairSync

    ReentrantLock一样,ReentrantReadWriteLock也有一个Sync对象。它也有两个子类FairSyncNonFairSync,分别代表了共享锁和独占锁的实现。

    对于FairSync而言,设想一种场景:队列中有线程等待独占锁排队,后面跟随多个等待共享锁的线程排队。这时如果有队列外线程需要获取独占锁,必须在这些等待共享锁的线程后排队。

    FairSync的代码如下:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            // 和ReentrantLock的公平独占锁一样
            return hasQueuedPredecessors();
        }
        // 是否允许没有共享锁的线程获取共享锁(无论返回true或false,已经有共享锁的线程都允许再次获得共享锁,即允许重入)
        // 没有独占锁的时候,共享锁是可以直接获取的,所以此时共享锁不会入队列
        // 如果有需要获取独占锁的线程排队,或者是获取到独占锁的线程正在执行,后续需要共享锁的线程必须依次排队
        // 有一种情况,如果一个线程持有独占锁尚未释放,后续没有其他线程排队,此时hasQueuedPredecessors也会返回false
        // 感觉像是允许获取共享锁。但是tryAcquireShared方法先保证了没有独占锁的情况下才会获取共享锁,因此这个问题不存在
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    

    NonFairSync

    如果队列中有等待独占锁的线程排队,后面跟随多个等待共享锁的线程排队。队列中第一个线程会和队列外线程抢占独占锁。如果队列外线程抢占失败,进入队列排队(位于等待共享锁的线程之后)。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            // 永远允许获取独占锁
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            // 这个方法用来防止等待获取独占锁的线程饥饿
            // 后面详细介绍
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    

    问题:apparentlyFirstQueuedIsExclusivehasQueuedPredecessors在行为上会带来什么区别?
    比方说AQS队列中的node为:
    独占,共享,共享,独占。

    第一个独占锁释放后,紧随的两个共享锁未来的及唤醒的时候,如果又有队列外线程要求获得共享锁,会抢在队列末尾的独占锁之前(apparentlyFirstQueuedIsExclusive返回false,因为这时候队列head之后的node为共享)。这点和公平锁的行为是不同的(hasQueuedPredecessors方法在这种情况下返回的是true)。

    apparentlyFirstQueuedIsExclusive方法的代码如下:

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        // 如果head节点的下一个节点是独占类型节点,并且不是空节点(s.thread != null)返回true
        // 对于共享锁,可以直接获取,无需进入等待队列
        // 所以说,如果有线程要求独占锁进入队列排队,它一定是紧随head之后的节点
        // 如果返回true,说明有线程需要获取独占锁
        // 为了需要独占锁的线程避免线程饥饿,在此之后需要获取共享锁的线程,不允许直接获取到锁,必须排队等待之前排队的独占锁获取并释放之后才能恢复运行
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    

    本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。

    相关文章

      网友评论

          本文标题:JUC原理之ReentrantReadWriteLock

          本文链接:https://www.haomeiwen.com/subject/trafyhtx.html