美文网首页
ReentrantReadWriteLock 源码分析

ReentrantReadWriteLock 源码分析

作者: 黑小鹰 | 来源:发表于2019-01-10 22:47 被阅读5次

    阅读建议:虽然我这里会介绍一些 AQS 的知识,不过如果你完全不了解 AQS,看本文就有点吃力了

    目录:
    1.简介
    2.一个缓存示例说明读写锁的使用方式
    3.读写锁的实现分析
    3.1 读写状态的设计
    3.2 ReentrantReadWriteLock 总览
    3.3写锁的获取
    3.4写锁的释放
    3.5读锁的获取
    3.6读锁的释放
    4.锁降级


    1.简介

    ReentrantLock实现是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

    一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock


    ReentrantReadWriteLock的特性

    2.一个缓存示例说明读写锁的使用方式

    public class Cache {
        private static final Map<String, Object>    map = new HashMap<String, Object>();
        private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private static final Lock                   r   = rwl.readLock();
        private static final Lock                   w   = rwl.writeLock();
        // 获取一个key对应的value
        public static final Object get(String key) {
            r.lock();
            try {
                return map.get(key);
            } finally {
                r.unlock();
            }
        }
        // 设置key对应的value,并返回旧的value
        public static final Object put(String key, Object value) {
            w.lock();
            try {
                return map.put(key, value);
            } finally {
                w.unlock();
            }
        }
        // 清空所有的内容
        public static final void clear() {
            w.lock();
            try {
                map.clear();
            } finally {
                w.unlock();
            }
        }
    }
    

    Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。


    3.读写锁的实现分析

    3.1 读写状态的设计

    读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

    ReentrantReadWriteLock 使用一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写


    读写锁状态的划分方式

    当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。

    3.2 ReentrantReadWriteLock 总览

    ReadLock 和 WriteLock 的代码提出来一起看,清晰一些:



    ReadLock 和 WriteLock 中的方法都是通过 Sync 这个类来实现的。Sync 是 AQS 的子类,然后再派生了公平模式和不公平模式。

    从它们调用的 Sync 方法,我们可以看到: ReadLock 使用了共享模式,WriteLock 使用了独占模式。

    等等,同一个 AQS 实例怎么可以同时使用共享模式和独占模式???

    这里给大家回顾下 AQS,我们横向对比下 AQS 的共享模式和独占模式:


    3.3写锁的获取

    写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,获取写锁的代码如下

            @ReservedStackAccess
            protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                     // 存在读锁或者当前获取线程不是已经获取写锁的线程
                    //也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // 这里不需要 CAS,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了
                    setState(c + acquires);
                    return true;
                }
                 // 如果写锁获取不需要 block,那么进行 CAS,成功就代表获取到了写锁
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

    该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。

    写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

    3.4写锁的释放

    血锁的释放,是线程安全的,因为写锁是独占锁,具有排他性,所以写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见

            @ReservedStackAccess
            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                // 如果 exclusiveCount(nextc) == 0,也就是说包括重入的,所有的写锁都释放了
                // 那么返回 true,这样会进行唤醒后继节点的操作。
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
    3.5读锁的获取

    读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。

    获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分

            @ReservedStackAccess
            protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                //1.如果另一个线程持有写锁,则失败。
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                // 读锁的获取次数
                int r = sharedCount(c);
                // 读锁获取是否需要被阻塞
                if (!readerShouldBlock() &&
                    // 判断是否会溢出 (2^16-1,没那么容易溢出的)
                    r < MAX_COUNT &&
                     // 下面这行 CAS 是将 state 属性的高 16 位加 1,
                    //低 16 位不变,如果成功就代表获取到了读锁
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    // r == 0 ->此线程是第一个获取读锁的
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    //  第一个获取 readLock 的是 current 线程, 直接计数器加 1
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
    //  先从 cachedHoldCounter拿数据, 数据不对的话, 再从readHolds拿数据
                        if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
               //代码调用 fullTryAcquireShared
                return fullTryAcquireShared(current);
            }
    

    fullTryAcquireShared 这个方法其实是 tryAcquireShared 的冗余(redundant)方法, 主要补足 readerShouldBlock 导致的获取等待 和 CAS 修改 AQS 中 state 值失败进行的修补工作

    3.6读锁的释放
    protected final boolean tryReleaseShared(int unused){ 
    Thread current = Thread.currentThread();
     //判断现在进行 release 的线程是否是 firstReader
                if(firstReader == current){                      
                    // assert firstReaderHoldCount > 0
                   // 只获取一次 readLock 直接置空 firstReader
                    if(firstReaderHoldCount == 1){            
                        firstReader = null;
                    }else{
                       3. 将 firstReaderHoldCount 减 1
                        firstReaderHoldCount--;             
                    }
                }else{
                     //  先通过 cachedHoldCounter 来取值
                    HoldCounter rh = cachedHoldCounter;        
     //  cachedHoldCounter 代表的是上次获取 readLock 的线程, 若这次进行 release 的线程不是, 再通过 readHolds 进行 lookup 查找
                    if(rh == null || rh.tid != getThreadId(current)){ 
                        rh = readHolds.get();
                    }
    
                    int count = rh.count;
     // count <= 1 时要进行 ThreadLocal 的 remove , 不然容易内存泄露
                    if(count <= 1){
                        readHolds.remove();                    
                        if(count <= 0){
                           // 并发多次释放就有可能出现
                            throw unmatchedUnlockException();  
                        }
                    }//HoldCounter.count 减 1
                    --rh.count;                                
                }         
                    for(;;){                                       
     // 这里是一个 loop CAS 操作, 因为可能其他的线程此刻也在进行 release操作
                    int c = getState();
                    int nextc = c - SHARED_UNIT;            
     //  这里是 readLock 的减 1, 也就是 aqs里面state的高 16 上进行 减 1
                    //所以 减 SHARED_UNIT
                    if(compareAndSetState(c, nextc)){
                        /**
                         * Releasing the read lock has no effect on readers,
                         * but it may allow waiting writers to proceed if
                         * both read and write locks are now free
                         */
                        return nextc == 0;                   
    // 返回值是判断 是否还有 readLock 没有释放完, 当释放完了会进行后继节点的 
    //唤醒( readLock 在进行获取成功时也进行传播式的唤醒后继的 获取 readLock 的节点)
                    }
                }
            }
    
    

    读锁释放的过程还是比较简单的,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。

    然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程。


    4.锁降级

    锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

    接下来看一个锁降级的示例。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作

        public void processData() {
            readLock.lock();
            if (!update) {
                // 必须先释放读锁
                readLock.unlock();
                // 锁降级从写锁获取到开始
                writeLock.lock();
                try {
                    if (!update) {
                    // 准备数据的流程(略)
                    update = true;
                    }
          readLock.lock();
          } finally {
            writeLock.unlock();
      }
         // 锁降级完成,写锁降级为读锁
      }try {
            // 使用数据的流程(略)
        } finally {
            readLock.unlock();
        }
    }
    

    锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

    RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

    参考文献:
    [1] ReentrantReadWriteLock 源码分析(基于Java 8)
    [2] Java 读写锁 ReentrantReadWriteLock 源码分析
    [3] Java的ReadWriteLock实现机制解析(一)
    [4] 《Java并发编程艺术》

    相关文章

      网友评论

          本文标题:ReentrantReadWriteLock 源码分析

          本文链接:https://www.haomeiwen.com/subject/tqgorqtx.html