美文网首页
ReentrantReadWriteLock代码浅析

ReentrantReadWriteLock代码浅析

作者: 有个点丶 | 来源:发表于2019-07-25 14:32 被阅读0次

    介绍

    除了重入锁ReentrantLock以外,Doug Lea大神还顺带实现了读写重入锁ReentrantReadWriteLock,依旧支持重入特性、公平与非公平模式,分出了读锁和写锁。

    读锁

    当读锁被持有后,会阻止其他线程获取写锁,但读锁不排斥其他线程持有读锁。

    写锁

    当写锁被持有后,既排斥其他线程申请读锁,还排斥其他线程申请写锁。

    结构

    具体的结构关系,直接上图:


    流程

    通过ReentrantReadWriteLock的readLock()获取读锁,writeLock()获取写锁,读锁和写锁都有lock()、lockInterruptibly()、tryLock()、tryLock(long timeout, TimeUnit unit),unlock()。

    lock()方法,遵循初始化时的公平锁或非公平锁模式请求锁,请求锁的过程不会被中断,直到持有锁为止。

    lockInterruptibly()方法,如果线程调用了interrupted(),请求锁的过程将会中断,并且抛出InterruptedException异常。如果线程没有被中断,那么会坚持到持有锁为止。

    tryLock()方法,无视初始化时公平与非公平模式,直接尝试一次请求锁的CAS操作,如果成功,返回true,失败,返回false。

    tryLock(long timeout, TimeUnit unit)方法,类似lockInterruptibly(),但是比lockInterruptibly()多了超时检查操作,当超时时,会中断请求锁过程。
    unlock()方法,释放锁对象当前持有的锁一次。

    申请读锁

    因为请求写锁锁的流程与普通重入锁大同小异,所以这里只展示读锁的lock()方法的请求流程,lock()方法中通过sync调用acquireShared()方法,acquireShared()方法的实现在AbstractQueuedSynchronizer中:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    所有读锁会进行一次tryAcquireShared(int arg)doAcquireShared(int arg)。获取写锁则调用acquire(int arg)方法,方法中if条件中调用tryAcquire(arg)acquireQueued(final Node node, int arg),与读锁获取的流程有些小差别。

    回到正题,tryAcquireShared(int arg)方法被Sync实现,再看ReentrantReadWriteLock.Sync中:

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    

    这个方法中有两个比较重要的流程,都会在特定条件下返回,先看看进入这两个流程的if条件。
    第一个if条件,exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current),先检查写锁重入数,如果不为0,再检查当前线程是否已经持有写锁,如果未持有写锁则返回-1表示失败,那么可以看出,同一个线程是可以同时持有写锁和读锁的。

    第二个if条件,!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)
    首先如果是公平锁,那么在readerShouldBlock()中会进入阻塞队列等待,如果非公平锁,就调用apparentlyFirstQueuedIsExclusive()方法检查队列第一个节点是否在等待写锁,如果不是,再来检查读锁持有是否超过最大值,如果未超过,再来CAS修改读锁持有数量,CAS操作成功之后,这个if流程中会修改部分引用和缓存信息,这个具体作用后面再说。

    在公平锁模式阻塞重新被唤醒后或者已有其他线程持有读锁,又或者读锁当前持有数现在已经到了MAX_COUNT时,都会跳过第二个if流程直接进入fullTryAcquireShared(Thread current)方法中:

    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                //这里注释写了一种造成读写锁死锁的情况
                //当前线程持有写锁,而队列中有其他线程正在等待写锁释放时,这个时候再来请求读锁时,就会导致死锁
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    //省略部分代码
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                //省略部分代码
                }
                return 1;
            }
        }
    }
    

    fullTryAcquireShared(Thread current)方法中的请求锁的过程其实和tryAcquireShared(int unused)中的过程十分相似,最重要的还是CAS操作的这个方法。只有compareAndSetState()成功之后才会结束fullTryAcquireShared(Thread current)方法中for循环,结束这次读锁请求操作。

    exclusiveCount(int c)与sharedCount(int c)

    fullTryAcquireShared(Thread current)方法与tryAcquireShared(int unused)方法中中,变量c,就是持有锁的个数,通过getState()获取到值后,先exclusiveCount(int c),再经过sharedCount(int c)后赋值给r,找到和它们有关的代码:

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 65536
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 65535
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 65535
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    

    exclusiveCount(int c)操作会清除整型变量c高16位的所有数值,保留低16位数值。
    sharedCount(int c) 操作清除整型变量c低16位,保留高16位数值。
    所以r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)就是在验证写锁持有数量,当c = 0c + SHARED_UNIT = 65536,经过sharedCount(int c)计算后得1,c + SHARED_UNIT相当于对 c = (c >>> 16 + 1) << 16,对c高16位数值加1。
    也就是说state值的高位保存着读锁数量,低位保存写锁数量,读锁与读锁最大持有数都是65535。

    关于读写锁等待队列

    不管是公平锁还是非公平锁,未能在其他线程释放锁时的特殊时刻正好获取到锁,都会进入等待锁队列,而这个队列被读锁与写锁共用,具体逻辑很多与重入锁高度重合,有兴趣可以看我写的这篇博客ReentrantLock源码通读(一)

    关于HoldCounter

    HoldCounter遍布在所有读锁的请求和释放过程当中,并且为了缓存这个读锁持有数,代码占据了这些流程代码量大概两成左右,所以HoldCounter的重要性不言而喻,那么具体是什么作用呢?
    在ReentrantReadWriteLock中对外提供了一个方法:

    /**
     * Queries the number of reentrant read holds on this lock by the
     * current thread.  A reader thread has a hold on a lock for
     * each lock action that is not matched by an unlock action.
     *
     * @return the number of holds on the read lock by the current thread,
     *         or zero if the read lock is not held by the current thread
     * @since 1.6
     */
    public int getReadHoldCount() {
        return sync.getReadHoldCount();
    }
    
    //Sync中getReadHoldCount()
    final int getReadHoldCount() {
        if (getReadLockCount() == 0)
            return 0;
    
        Thread current = Thread.currentThread();
        if (firstReader == current)
            return firstReaderHoldCount;
    
        HoldCounter rh = cachedHoldCounter;
        if (rh != null && rh.tid == getThreadId(current))
            return rh.count;
    
        int count = readHolds.get().count;
        if (count == 0) readHolds.remove();
        return count;
    }
    

    注释翻译过来就是,查询当前线程重入读锁的次数。不是可以通过Sync中的state全局变量获取重入次数吗,读锁与写锁是通过int的高低位来区分的,十分方便,为什么不能用这个state来获取读锁的重入次数呢?
    事实上写锁的确是利用state的来获取写锁的重入次数的,但是写锁是一个线程独占的,它会排斥其他线程获取写锁以及读锁。
    读锁,因为其可被共享的特殊性质,导致state中高位存储的是多个线程重入后的总数,所以state无法用来查询当前线程重入读锁的次数,当前线程无法知晓自己重入次数,这在某些复杂场景或者使用不规范的情况下,可能造成读锁无法被正常释放,而且排查起来也异常困难。
    所以HoldCounter出现了,与HoldCounter配合当然少不了ThreadLocal,ReentrantReadWriteLock.Sync中包含有HoldCounter静态内部类和ThreadLocalHoldCounter静态内部类。
    HoldCounter记录了重入计数与线程的tid。
    ThreadLocalHoldCounter继承ThreadLocal,重载了initialValue()方法。
    并且为了更快的获取重入数,还额外赠送了两个全局变量,cachedHoldCounter和firstReaderHoldCount。
    cachedHoldCounter保存上个请求读锁线程的HoldCounter。
    firstReaderHoldCount保存第一个获取读锁线程的重入数。

    所以Sync实现的getReadHoldCount()方法根据return,分为四个分支,通过各种全局缓存变量,尽量高效的返回结果。
    第一个分支,读取state的高位,如果为0返回。
    第二个分支,检查当前线程是否等于firstReader,如果是,返回firstReaderHoldCount
    第三个分支,检查cachedHoldCounter中的tid是否等于当前线程的tid,如果是,返回cachedHoldCounter.count
    第四个分支,就是遍历ThreadLocal中的threadMap找到当前线程的HoldCounter返回count

    思考

    一个线程是否可以同时获取到写锁与读锁?
    肯定可以,不然也不会在注释里说明会产生死锁的场景了。
    根据写锁的tryAcquire(int arg)源码:

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    先判断了state != 0,再判断w==0 || current != getExclusiveOwnerThread()直接堵死线程获取读锁在获取写锁的可能性。
    所以能同时获取读锁和写锁的操作只能是先获取写锁,然后获取读锁。
    如何能成功同时获取读锁和写锁呢?
    在非公平模式下,线程持有写锁后,去请求读锁,而此时等待线程队列中第一个节点不是请求写锁的线程,即可成功获取读锁与写锁,但如果是请求写锁的线程,会返回失败状态值-1,之后进入等待线程队列,线程在持有写锁的情况下休眠并且等待通知被唤醒,由于唤醒超时线程的触发点实在有线程释放锁之后,所以这时无论是否设置过超时参数,都是无效的,依然会导致死锁,所以在使用时需要格外注意。

    总结

    读锁可以与其他线程共享,但排斥其他线程获取写锁。
    写锁排斥其他线程获取锁,当前持有写锁后,仍然可以去获取读锁,完全独占,但是写锁竞争激烈时,完全独占的操作也很容易导致死锁。
    读写锁的持有或重入上限均是65535。
    当前线程重入计数,读写锁是分开保存的,state中保存的是读锁重入的总数,写锁不受影响。
    当前线程读锁重入次数是使用ThreadLocal保存的,每个线程单独维护一个HoldCounter。使用的三个全局变量缓存特殊状态的线程重入计数变量,尽量快的返回结果。

    相关文章

      网友评论

          本文标题:ReentrantReadWriteLock代码浅析

          本文链接:https://www.haomeiwen.com/subject/efnlaqtx.html