美文网首页
ReentrantReadWriteLock源码解析

ReentrantReadWriteLock源码解析

作者: 神易风 | 来源:发表于2022-08-12 14:02 被阅读0次

    在使用synchronize关键字修饰方法后,只允许一个线程进行访问,这个虽然有利于保证数据安全,却实际场景背道而驰的。实际中数据都是读取多,写入少,我们需要更粗细粒的并发锁。JVM concurrent.locks包给我们提供ReadWriteLock读写锁,内置两把锁,读锁、写锁,满足多个线程并发读取数据,写入时互斥所有线程,既保证了数据安全,又提升了响应量。

    概念

    读锁: 可以理解成共享锁,允许多个线程同时读取
    写锁: 独占锁,有且只允许一个线程访问
    读写互斥: 在获取写锁时,必须等待所有读锁全部释放,才能获取成功,读锁会堵塞写锁,写锁会堵塞所有的线程。
    锁升级: 在使用读锁时,已经获取读锁线程在没有释放读锁的情况下,去获取写锁这就是锁升级。这是不被允许的,锁升级会造成死锁。

            // 这个会造成死锁
            ReadWriteLock  lock = new ReentrantReadWriteLock();
            lock.readLock().lock();
            lock.writeLock().lock();
    

    锁降级: 已经获取到写锁线程,被允许在没有释放锁的情况下去获取读锁的,值得注意读锁、写锁仍然需要单独释放。

            //并不会造成死锁
            ReadWriteLock lock = new ReentrantReadWriteLock();
            lock.writeLock().lock();
            lock.readLock().lock();
    

    使用官方例子演示ReentrantReadWriteLock 使用场景,每次获取缓存时,先判断缓存是否已经失效了,如果失效了使用写锁更新缓存。

     class CachedData {
        Object data;
        volatile boolean cacheValid; //缓存失效标记
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
          void processCachedData() {
          rwl.readLock().lock(); 
         if (!cacheValid) {
            // 必须释放了读锁才能去获取写锁,这样不会造成死锁
            rwl.readLock().unlock(); 
           rwl.writeLock().lock(); 
           try {
              // 双重检查状态,因为在获取锁的可能被其他线程更新状态了
              // 获取到写锁,更新缓存和状态
              if (!cacheValid) {
                data = ...
                cacheValid = true;
              }
              // 通过在释放写锁之前获取读锁来降级
              rwl.readLock().lock(); 
           } finally {
              rwl.writeLock().unlock();
     // Unlock write, still hold read 
           } 
         } 
           try {
            use(data); 
         } finally {
            rwl.readLock().unlock();
          } 
       } 
     }
    

    代码很少,但是非常有代表性,非常适合缓存这种读取多,更新少的场景。在每次读取缓存时,先开启读锁,检查缓存状况,需要更新缓存时。先释放读锁然后再去获取写锁,在更新前先判断缓存又没被其他线程更新过了,更新完数据后降级到读锁,再释放写锁,使用缓存释放读锁。

    源码解析

    这里源码分析只有简单讲解两个锁的获取、释放原理,看阅读源码之前,自备AQS的知识点。
    ReentrantReadWriteLock是实现ReadWriteLock接口的实现类,内部使用AQS的int state来表示读写锁的状态

    state.png
    如上图所示,两个锁的获取、释放都是同时使用int state来进行,使用低16位表示写锁获取次数、高16位表示读锁获取次数。使用内部类Sync 单独编写共享锁、独占锁的获取释放具体实现,再使用ReadLock、WriteLock分别调用共享锁、独占锁的方法。源码阅读先从Sync内部类开始。

    内部属性

        abstract static class Sync extends AbstractQueuedSynchronizer {
             //读 写 锁分界点
            static final int SHARED_SHIFT   = 16;
            //读锁最小单位,刚好表示当前拥有一个读锁线程
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            // 支持最大读取次数
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            //写锁掩码
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** 计算当前获取共享锁数量  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** 计算当前获取独占锁数量  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
     //主要用于保存每一个读锁线程的重入次数
     static final class HoldCounter { //初始化对象,就将当前线程id赋值给tid
                int count = 0; //重入次数
                // Use id, not reference, to avoid garbage retention
                final long tid = getThreadId(Thread.currentThread());
            }
    
            /**
             *  保存HoldCounter到每一个线程私有栈祯
             */
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {  //实现初始化接口,每一次调用get()时,没有值就会调用初始化方法
                    return new HoldCounter();
                }
            }
    
            /**
             *  记录读锁重入次数
             */
            private transient ThreadLocalHoldCounter readHolds;
    
            /**
             *  这个是上一个读锁HoldCounter 缓存
             */
            private transient HoldCounter cachedHoldCounter;
    

    我这里认为读锁做一个共享锁在重入次数上,state不能准确表达出每一个线程到底重入了多少次,所以需要用到HoldCounter来记录每一个线程获取锁次数,在释放锁的时候,会看下如何使用的。

    共享锁的获取和释放

            protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                // 当独占锁不等于0,这时只有独占锁是自身的情况下才能获取到读锁
                //两个条件都满足时,写锁获取到读锁   锁降级
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c); //持有共享锁数
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    //这里只对高位进行累加,设置成功就相当于获取锁成功了
                    compareAndSetState(c, c + SHARED_UNIT)) {  
                    if (r == 0) { //首次加锁   firstReader 必须是读锁线程
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) { //重入
                        firstReaderHoldCount++;
                    } else { //当前线程不是首个读锁持有者,要使用HoldCounter 记录重入
                        HoldCounter rh = cachedHoldCounter;  //这是上一个线程缓存
                        if (rh == null || rh.tid != getThreadId(current))
                             //这里会返回当前线程初始化值 也就是数量为空0
                            //将当前线程重入对象赋值给缓存
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0) //第一次进入
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                // cas 竞争失败,完整版本共享锁获取
                return fullTryAcquireShared(current);
            }
    

    readerShouldBlock 是一个队列堵塞策略方法,用于区分公平锁和非公平锁的实现,当返回true时,会堵塞所有获取读锁线程。

            final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                for (;;) { //自旋获取锁,直到成功
                    int c = getState();
                    if (exclusiveCount(c) != 0) {  //这时已经是写锁状态
                        if (getExclusiveOwnerThread() != current)  //不是锁降级就退出循环
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) { //当返回true,则说明已经存在堵塞线程,这是要么自旋,要么失败
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) { //重入获取读锁,这也是不行的
                            // assert firstReaderHoldCount > 0;
                        } else { //判断线程栈祯是否还有重入,如果出现重入自旋等待
                            if (rh == null) { 
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0) //锁已经释放了,不能算是重入了,直接失败了
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT) 
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) { //高16位运算,获取共享锁成功
                        if (sharedCount(c) == 0) { // 下面代码跟上面基本一致,略过.....
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    

    总结下: 当获取共享锁时,只有检测到独占锁时,获取锁方法会立即返回失败。
    看下共享锁释放

            protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) { 
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1) //锁已经退出,归还缓存
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) { //当前锁没有重入,直接删除
                        readHolds.remove();
                        if (count <= 0) //多次释放锁
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) { //自旋 锁数量减一
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;  //共享锁数据为0
                }
            }
    

    HoldCounter用于维护每一个线程释放锁数量,保证释放不会超过自身持有的数量。

    独占锁获取和释放

            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
            protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) { // c 不能等于0 ,当前仍然持有锁,有可能是独占锁或者是共享锁
                    // 如果独占锁为空0,则说明当前仍然有线程没有释放读锁,这个不满足写锁获取,直接失败
                    //w > 0 ,这是说明已经有线程获取独占锁了,这时必须是重入才会获取成功
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    //这里是重入了
                    setState(c + acquires);
                    return true;
                }
                //竞争获取锁 
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

    writerShouldBlock:当返回true会堵塞获取锁的线程,用于区分公平锁和非公平锁实现。结合上面代码,当返回true时,不会去获取锁,直接失败了。
    独占锁释放

            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())  //持有独占锁线程不是当前线程
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;  
                boolean free = exclusiveCount(nextc) == 0;
                if (free)  //所有锁都被释放了,可以将独占锁线程致空
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    

    公平锁和非公平锁

    ReentrantReadWriteLock内部有两个锁可以选择,公平锁和非公平锁。通过构造参数进行选择,默认使用非公平锁。

        public ReentrantReadWriteLock() {
            this(false);
        }
    
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    

    非公平锁: 在获取读锁或者写锁时,获取锁的线程并不是顺序的,在堵塞队列中的线程可能长期等待,获取不到锁,而没有在堵塞队列中等待线程反而能快速获取到锁,这个会造成线程饥饿,但是会比公平锁有更高的吞吐量。
    公平锁: 保证每一个等待最久线程最先获取到线程执行权,线程都会按照AQS堵塞顺序获取锁,这样有利于避免线程饥饿的产生,但是在在获取锁需要判断队列有一定性能损耗,所以吞吐量不如非公平高。

    公平锁和非公平锁区别在在于writerShouldBlock 、readerShouldBlock 方法实现不同而已。
    公平锁实现

        static final class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
            }
            final boolean readerShouldBlock() {
                return hasQueuedPredecessors();
            }
        }
    

    hasQueuedPredecessors: 返回true则说明AQS中存在堵塞线程,只有在出现写锁的时候,才会将获取锁线程放入队列中,所以readerShouldBlock在读锁获取时,会永远返回false。
    非公平锁

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            final boolean writerShouldBlock() {
                return false; // writers can always barge
            }
            final boolean readerShouldBlock() {
                // 只有堵塞队列第一个线程为非共享锁时才会返回true
                // 当队列前面已经出现写锁了,所有共享锁都不能和写锁竞争,放弃竞争
                return apparentlyFirstQueuedIsExclusive();
            }
        }
    

    从上面代码知道,只有这两个方法返回true,都不能去竞争锁,公平锁的策略非常明显,只有堵塞队列有线程,就会放弃锁竞争。而非公平锁则是在写锁时,无论队列有无线程都会尝试竞争,写锁时只有队列最前面的线程为写锁时,才会放弃竞争,总的来说公平锁和非公平锁逻辑和ReentrantLock 逻辑基本一样。

    tryLock

    在读锁、写锁的对象中,都存在tryLock 方法,它跟lock方法有“亿点点”不同,虽然他们都是调用了内部Sync方法,但是在获取锁方法上,和上面分析tryAcquire、tryAcquireShared基本一致,唯独缺少了readerShouldBlock、writerShouldBlock使用。使用这个方法获取锁,无论公平锁还非公平锁,获取锁逻辑都一样。无论堵塞队列是否有线程,会直接竞争获取锁,在非公平锁中读锁会让步队列中第一个写锁,写锁优先级会高于读锁。但tryLock不存在,所有锁的竞争的公平的,快速的,可以理解这个方法在获取锁上会有更高的优先级(相比lock)。

    相关文章

      网友评论

          本文标题:ReentrantReadWriteLock源码解析

          本文链接:https://www.haomeiwen.com/subject/kxnowrtx.html