美文网首页Java高开发GitHub 中文社区Spring源码分析
ReentrantReadWriteLock(可以重入的读写锁)

ReentrantReadWriteLock(可以重入的读写锁)

作者: java高并发 | 来源:发表于2019-07-26 17:20 被阅读2次

    一、ReentrantReadWriteLock简介

    上一篇文章我们将讲到的ReentrantLock和Synchronized锁,都属于排他锁,也就是说只会有一个线程获取锁;而我们今天讲的ReentrantReadWriteLock(读写锁)是支持多个线程同时获取锁的在获取读锁时;但是在获取到写锁时,其它的写锁和读锁都会阻塞;其实可以看出读写锁,维护了一对锁,一个写锁,其实是个排他锁,一个读锁,是共享锁;通过分离读锁和写锁,使得并发性相比一般的排他锁有很大的提升;读写锁的性能比排他锁好,因为在大多数场景中读是多于写的;读写锁提供了,公平性的选择、重新进入(该锁支持重进入,以读写锁线程为例:读线程在获取读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁)和锁降级(遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁)等特性。

    二、ReentrantReadWriteLock基本成员
    我们先来看一张ReentrantReadWriteLock类图

    Sync是一个内部类,继承AQS,主要实现AQS的一些方法。


    ReentrantReadWriteLock(可以重入的读写锁)源码浅析

    基本成员简介

         static final int SHARED_SHIFT   = 16;
            // 这个是读锁的原始累加值(也就是说每次获取读锁都是获取状态state,然后用state加它),是2^16
            // 举个例子,假设现在state为1,那么现在来获取读锁就是1+SHARED_UNIT
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            // 读锁和写锁的最大数量,都是2^16 - 1
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            // 写锁的掩码,其实就是2^16 - 1,这个数的二进制很特殊,16位全是1
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** Returns the number of shared holds represented in count  */
            // 读锁的数量
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            // 写锁的数量
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
                    // 记录每个线程获取读锁的数量
            // count是数量
            // tid是线程的唯一标识
            static final class HoldCounter {
                int count = 0;
                // Use id, not reference, to avoid garbage retention
                final long tid = getThreadId(Thread.currentThread());
            }
    
                    // 继承ThreadLocal,主要用来存储HoldCounter
            static final class ThreadLocalHoldCounter
                    extends ThreadLocal<java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter> {
                public java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter initialValue() {
                    return new java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter();
                }
            }
    
                    // 可以理解为最后一个获取读锁的线程(用于优化性能,不需要去ThreadLocal查找)
                     private transient HoldCounter cachedHoldCounter;
    
                     // 第一个获取读锁的线程和读锁的数量,作用是如果是第一个,不走ThreadLocal
                     private transient Thread firstReader = null;
                     private transient int firstReaderHoldCount;
    

    解释下读写锁的状态计算,state一个数,怎么控制的读和写
    先来看写锁,先来看写锁的掩码EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1,这个数是65535,二进制就是16个1,我们看见获取写锁的个数c & 65535(exclusiveCount这个方法,c代表就是state,锁的状态,不理解可以看看AQS对state的定义),但是由于二进制16位是1,所以只要c在0-65535这个范围,取&都还是c(由于这个掩码的特殊性),根据写锁的定义只能有一个线程获取写锁,写锁获取了就要阻塞其它线程获取读或者写,怎么判断了,其实只要判断c& 这个二进制是不是等于0就可以了,所以了写锁的范围其实就是0-65535,其实二进制的范围就是低16位(因为最大数量是MAX_COUNT = 2^16-1)。
    再来分析下读锁,读锁我们主要关注下SHARED_UNIT就可以,获取锁其实是用c+SHARED_UNIT(c代表的就是sate),释放锁是用c-SHARED_UNIT,这个数是 65536(SHARED_UNIT 其实就是 2 ^ 16),我们每次获取读锁其实就是把SHARED_UNIT累加,其实我们可以把SHARED_UNIT的一次累加就当做一次读锁的获取,我们看下读锁的值得范围是0 - 负65536(负数和int的最大值有关,int正数的最大值是2147483647,在给它累加其实就会变为负数,最后最大其实就是高16位全是1,因为读锁的最大数量是MAX_COUNT = 2^16-1,所以其实读锁的范围可以看做是高16位),获取读锁的个数就是无符号右移16位(就是sharedCount方法),因为可能是负数。

    ReentrantReadWriteLock(可以重入的读写锁)源码浅析

    上面的图片其实就是一个读锁,一个写锁,这种情况只有在同一个线程才会发生,如果你能算出一个写锁和读锁,说明你基本理解了读写锁状态控制的运算方法。
    ReadLock和WriteLock,内部类,继承Lock,提供一些锁的方法。
    FairSync和NonfairSync,内部类,是继承Sync,主要实现公平和非公平的一些方法

    三、ReentrantReadWriteLock基本方法
    1)、构造方法

    // 无参,默认非公平
    public ReentrantReadWriteLock() {
            this(false);
        }
            // 有参
            public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    2)、WriteLock:写锁一些方法,如下图:


    ReentrantReadWriteLock(可以重入的读写锁)源码浅析

    ①、获取锁lock方法,我们可以看出调用的是sync.acquire方法,由于sync继承自AQS所以调用的其实是AQS的acquire,但是AQS的tryAcquire需要子类自己实现,所以我们看看tryAcquire(可以看出是个独占方法,也符合写锁的定义,只会有一个线程获取)。

    public void lock() {
                sync.acquire(1);
            }
    
    

    子类sync的tryAcquire

    // 写锁的状态控制(state)
            protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1\. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2\. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3\. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
                // 获取当前线程
                Thread current = Thread.currentThread();
                // 获取当前锁的状态
                int c = getState();
                // 获取写锁的状态,c & (2的16次方-1)
                // 2的16次方-1 其实就是65535,变成二进制就是16个1
                int w = exclusiveCount(c);
                // c不等于0,证明有线程获取锁了,不管是读锁或者写锁
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    // w == 0,说明有读锁了,w!= 0证明有写锁
                    // current != getExclusiveOwnerThread()说明有写锁了,不是自己
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    // 证明了是自己获取了写锁,如果大于锁的最大数量,抛异常
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    // 说明没有超出限制,可以重入
                    setState(c + acquires);
                    return true;
                }
                // 走到这一步,证明还没有线程获取锁
                // writerShouldBlock 现在公平还是非公平,由FairSync和NonfairSync实现这个方法
                if (writerShouldBlock() ||
                        // cas设置所得状态
                        !compareAndSetState(c, c + acquires))
                    //失败或者公平锁在我的前面有排队节点
                    return false;
                // 设置拥有锁的线程
                setExclusiveOwnerThread(current);
                return true;
            }
    

    在上面的写锁获取锁时我们需要注意下writerShouldBlock这个方法,它是实现公平和非公平的关键,它的公平和非公平的方法实现不同,公平是需要确认自己前面是否有排队节点,非公平直接返回false,具体查看这个方法。
    ②、写锁释放锁:unlock方法,它是其实也是调用的也是调用AQS的release方法,我们直接看子类的实现吧。

    public void unlock() {
                sync.release(1);
            }
    

    子类sync的tryRelease方法

    protected final boolean tryRelease(int releases) {
                // 判断是否获取锁的是这个线程
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 释放锁,修改state
                int nextc = getState() - releases;
                // 判断写锁是否完全释放
                boolean free = exclusiveCount(nextc) == 0;
                // 完全释放,修改锁的拥有者为null
                if (free)
                    setExclusiveOwnerThread(null);
                // cas状态
                setState(nextc);
                return free;
            }
    

    ③、我们发现WriteLock里面还有一些获取锁的方法,lockInterruptibly响应中断,tryLock(long timeout, TimeUnit unit)支持中断并且带超时时间,其实都是调用了AQS里面的这些方法,然后获取锁这部分的实现都是调用的子类sync的tryAcquire方法。
    3)、ReadLock:读锁的一些方法,如下图


    ReentrantReadWriteLock(可以重入的读写锁)源码浅析

    ①、获取锁lock方法,调用过程和写锁一样,先走AQS,不过这一次调用的是共享锁的方法acquireShared,然后AQS在调用子类sync的实现方法。

    public void lock() {
                sync.acquireShared(1);
            }
    

    sync的tryAcquireShared

    protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1\. If write lock held by another thread, fail.
                 * 2\. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3\. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                // 获取当前线程
                Thread current = Thread.currentThread();
                // 获取当前锁状态
                int c = getState();
                // exclusiveCount(c) != 0 有线程获取了写锁
                // 并且这个获取写锁的线程不是自己
                if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                    return -1;
                 // 现在只会有三种清理
                // 1、还没有任何线程获取锁,不管是读锁还是写锁
                // 2、有线程获取到了读锁
                // 3、获取写锁的线程时自己
                int r = sharedCount(c);
                //  readerShouldBlock 由FairSync和NonfairSync实现
                // FairSync 判断是否前面有排队节点
                // NonfairSync排队节点是否有写节点
                if (!readerShouldBlock() &&
                        r < MAX_COUNT &&
                        compareAndSetState(c, c + SHARED_UNIT)) {
                    // r == 0证明还没有获取到锁
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                        // 重入锁
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        // 获取最后一个获取锁的HoldCounter
                        java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
                        // 最后一个HoldCounter是空或者不是本线程,就设置一个
                        if (rh == null || rh.tid != getThreadId(current))
                            // 其实这一步做了两件事情,其实是先set了一个HoldCounter,然后在get之后设置给rh
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0) // 理解不了什么时候会是0
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                // 执行fullTryAcquireShared方法有几种情况了
                // 1.获取锁的下一个节点还是写锁,需要等待
                // 2.到达获取锁的最大数量
                // 3.可能存在多线程进程来设置读锁,cas失败了
                return fullTryAcquireShared(current);
            }
    
                    final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter rh = null;
                for (;;) { // 自旋获取锁
                    int c = getState(); // 获取锁状态
                    if (exclusiveCount(c) != 0) { // 判断有没有写锁,不等于0证明有写锁
                        if (getExclusiveOwnerThread() != current) // 写锁不是自己
                            return -1;  // 返回
                            // 写锁时自己,其实就可以获取,其实就是锁降级(可以看做是一种特殊的重入锁)
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
    
                        // 到下面这个else if证明没有写锁
                        //  readerShouldBlock 由FairSync和NonfairSync实现公平和非公平原则
                        // FairSync 判断是否前面有排队节点
                        // NonfairSync排队节点是否有写节点
                    } else if (readerShouldBlock()) { // 到这一步证明了,是公平锁或者非公平锁的头结点.next是写锁,
                                                        // 此线程需要进入同步队列了,下面就是判断这个线程有没有获取过锁
                        // Make sure we're not acquiring read lock reentrantly
                        // 第一个获取锁的是当前线程,证明可以重入
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            // 进去这里说明,firstReader不是当前线程,那就说明获取读锁的不止一个了,因为firstReader不可能为null
                            // 获取最后一个获取读锁的HoldCounter
                            if (rh == null) { // rh == null 只会是第一次循环
                                rh = cachedHoldCounter; // 获取缓存的HoldCounter
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    // 从 ThreadLocal 中取出计数器,如果没有就会重新创建并设置
                                    rh = readHolds.get();
                                    if (rh.count == 0) // 那就证明没有获取到读读锁
                                        readHolds.remove(); // 删除这个
                                }
                            }
                            if (rh.count == 0) // 这个是上面刚刚创建的证明获取锁失败了,需要进入队列
                                return -1;
                        }
                    }
                    // 获取读锁的数量==MAX_COUNT,抛异常
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
    
                    // 使用cas设置读锁的状态,下面逻辑和tryAcquireShared的逻辑一样
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        // 还没有获取读锁
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    

    在读锁获取锁时需要注意下readerShouldBlock这个方法,和写锁类似,这个方法也是主要实现公平与非公平的关键,非公平锁(NonfairSync)时需要注意,如果获取读锁时,需要执行apparentlyFirstQueuedIsExclusive这个方法,判断队列head的next是否是写锁(是写锁,让这个写锁先来,避免写锁饥饿,就是避免写线程获取不到锁,所以写有很高的优先权),则自己获取读锁就需要排队,公平锁(FairSync)实现则还是需要判断队列里面是否有节点在排队。
    ②、释放锁unlock,调用AQS的releaseShared方法,我们主要关注子类实现

     public void unlock() {
                sync.releaseShared(1);
            }
    

    子类sync的tryReleaseShared方法

    protected final boolean tryReleaseShared(int unused) {
                // 获取当前线程
                Thread current = Thread.currentThread();
                // 如果第一个读锁拥有者是当前线程
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    // 读锁的数量为1,没有重入
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        // 重入了,修改数量
                        firstReaderHoldCount--;
                    // 读锁已经不是一个了
                } else {
                    // 获取最后一个缓存
                    java.util.concurrent.locks.ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
                     // 获取缓存HoldCounter
                    if (rh == null || rh.tid != getThreadId(current))
                        // 不是就获取
                        rh = readHolds.get();
                    // 获取读锁线程的数量
                    int count = rh.count;
                    // 如果小于等于1
                    if (count <= 1) {
                        // 删除这个线程的HoldCounter
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    // 读锁数量递减
                    --rh.count;
                }
                for (;;) {
                    // 获取状态stase
                    int c = getState();
                                        // 读锁状态减1(其实就是减SHARED_UNIT)
                    int nextc = c - SHARED_UNIT;
                    // cas设置线程状态
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        // 为什么nextc == 0才会返回true了
                        // nextc == 0代表了什么了,没有读锁了
                        // 返回true,就代表可以去唤醒下一个线程了,但是队列的线程是写线程或者线程了,不确定
                        // 但是这个对读线程是没有影响的,但是对写锁是有影响的,我们想象一下什么情况下才会下个节点会是写锁获取的线程了
                        // 其实就是已经有线程获取了写锁,因为有线程获取了写锁,所以可能发生锁降级,写锁降级为读锁
                        // 为了保护锁降级的语义,所以必须保护读锁,直到没有读锁了才会去唤醒后面可能的写锁,也就是返回true
                        return nextc == 0;
                }
            }
    

    ③、lockInterruptibly和tryLock(long timeout, TimeUnit unit)和写锁的这些方法作用一样。
    四、总结

    ReentrantReadWriteLock读写锁,要想学习好这个类,就必须了解什么是读锁,什么是写锁,怎么区别读锁和写锁,因为我们都知道锁都是通过AQS的state来控制的,但是现在是两个锁,所以理解ReentrantReadWriteLock对state拆分的运算很重要,也就是二进制的低16位是写锁,高16位是读锁,也不得不说大神设计让人叹为观止啊;还有就是理解读写锁的一些特性,重入指的就是同一个线程获取锁之后,再次调用lock方法不会被阻塞,但是注意调用几次lock,就要调用几次unlock,因为我们通过源码得知重入其实就是对state的累加,还有就是锁降级,我个人更愿意理解为特殊的重入,锁降级就是一个线程先获得写锁,然后这个线程再去获取读锁,这样不会阻塞,然后写锁其实就降级为了读锁,然后在释放写锁,最后释放读锁,作者为什么这么设计了,书上说的,锁降级主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新,如果当前线程获取读锁,即遵循锁降级的步骤,则线程T,就会阻塞,直到当前线程使用数据并释放读锁之后,线程T才会获得写锁并更新数据。个人理解这个锁降级其实是一种特殊的锁的优化策略,在我们需要在边写边读的这种业务场景中,保证数据可见性的同时(不让其他线程获取写锁),提升本线程读锁性能,因为不需要和写锁或者其他读锁(公平锁)去竞争获取锁,而是直接降级为读锁。

    欢迎工作一到五年的Java工程师朋友们加入JavaQQ群:219571750,群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!


    相关文章

      网友评论

        本文标题:ReentrantReadWriteLock(可以重入的读写锁)

        本文链接:https://www.haomeiwen.com/subject/ltvdrctx.html