美文网首页
4.ReentrantReadWriteLock核心原理分析

4.ReentrantReadWriteLock核心原理分析

作者: 致虑 | 来源:发表于2020-09-24 09:40 被阅读0次

    ReentrantReadWriteLock核心原理分析

    前面有详细介绍ReentrantLock,这里先明确一点,ReentrantLock是一个独占锁,支持锁重入,那么也就是每次只能有一个线程获取锁,有经验的小伙伴明显会发现一个问题:性能问题(读多写少)。类似于数据库(mysql),要是按照每次只有一个线程获取锁,那岂不要慢死。

    所以ReentrantReadWriteLock就是为了满足这点,在读-读线程间共享、在读-写、写-写线程间互斥,因此这就是ReentrantReadWriteLock读写锁的核心所在。

    那么跟前面分析ReentrantLock一样,要搞清楚ReentrantReadWriteLock的核心,就必须了解一下三点:

    1)怎么实现线程间锁的共享(读-读)

    2)怎么实现线程间锁的互斥(读-写、写-写)

    那么围绕上面两点,开始详细分析ReentrantReadWriteLock的核心实现。

    1.demo出发

    首先先手敲一个demo,这样就能有一个出发点了。

    public static void main(String[] args) {
      
      // 先定义好读写锁
      ReentrantReadWriteLock lock= new ReentrantReadWriteLock();
      ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
      ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    
      // 模拟读写线程(T1、T2为读线程,T3为写线程)
      new Thread(() -> read(target, readLock),"T1").start();
      new Thread(() -> read(target, readLock),"T2").start();
      new Thread(() -> write(target, writeLock),"T3").start();
    }
    
    // 模拟读
    private static void read(AtomicInteger target, ReentrantReadWriteLock.ReadLock readLock) {
      try {
        while (true) {
          readLock.lock();
          System.out.println("线程 " + Thread.currentThread().getName() + " 获取读锁");
          TimeUnit.SECONDS.sleep(2);
          System.out.println("线程 " + Thread.currentThread().getName() + " 释放读锁");
          readLock.unlock();
        }
      } catch (InterruptedException e) {}
    }
    
    // 模拟写
    private static void write(AtomicInteger target, ReentrantReadWriteLock.WriteLock writeLock) {
      try {
        while (true) {
          TimeUnit.SECONDS.sleep(1);
          writeLock.lock();
          System.out.println("线程 " + Thread.currentThread().getName() + " 获取写锁");
          TimeUnit.SECONDS.sleep(1);
          System.out.println("线程 " + Thread.currentThread().getName() + " 释放写锁");
          writeLock.unlock();
        }
      } catch (InterruptedException e){}
    }
    

    执行看下输出效果

    线程 T1 获取读锁
    线程 T2 获取读锁
    线程 T1 释放读锁
    线程 T2 释放读锁
    
    线程 T3 获取写锁
    线程 T3 释放写锁
    
    线程 T1 获取读锁
    线程 T2 获取读锁
    线程 T1 释放读锁
    线程 T2 释放读锁
    
    线程 T3 获取写锁
    线程 T3 释放写锁
    
    线程 T1 获取读锁
    线程 T2 获取读锁
    线程 T1 释放读锁
    线程 T2 释放读锁
    

    输出还蛮均匀的,看到了T1、T2可以同时获取读锁,在都释放后,T3可以获取写锁,T3释放后,T1、T2继续跟上。

    从上面的效果可以总结一点就是: 读-读可以同时进行,读-写不能同时进行,读-写获取锁必须等待对方完全释放(几次读锁获取,就释放几次)

    2. 从代码分析原理

    先来全局瞅一下ReentrantReadWriteLock的全貌

    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
      // 读锁
      private final ReentrantReadWriteLock.ReadLock readerLock;
      
      // 写锁
      private final ReentrantReadWriteLock.WriteLock writerLock;
      
      // 同步器
      final Sync sync;
    
      // 默认非公平
      public ReentrantReadWriteLock() {
        this(false);
      }
    
      // 公平
      public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
      }
      ...
    }
    

    ReentrantReadWriteLock的整体结构已经有个大概了,简而言之就是,首先继承了ReadWriteLock,内部单独维护了两个子类对象,分别是ReadLock(读锁)跟WriteLock(写锁),同时实现了自己的AQS同步器(Sync)。那么从以上结构大致就能知道ReentrantReadWriteLock具备以下功能:

    1)支持锁重入

    2)支持公平、非公平

    3)支持读写锁

    这里锁重入、公平与非公平在分析ReentrantLock讲的很清楚,这里不再赘叙。

    那么前面分析AQS的时候有讲过,要想实现自定义同步语义,只需要实现AQS内部几个指定方法,借助AQS实现即可完成,那么看下ReentrantReadWriteLock中的AQS是如何自定义的。

    abstract static class Sync extends AbstractQueuedSynchronizer {
      // 定义共享锁最大数
      static final int SHARED_SHIFT   = 16;
      static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
      static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
      static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
      /** 共享保留数(读锁记录)  */
      static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
      /** 独占保留数(写锁记录)  */
      static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
      static final class HoldCounter {
        int count = 0;
        final long tid = getThreadId(Thread.currentThread());
      }
      static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
          return new HoldCounter();
        }
      }
    
      // 独占方式获取和释放锁
      protected final boolean tryAcquire(int acquires) {...}
      protected final boolean tryRelease(int releases) {...}
    
      // 共享方式获取和释放锁
      protected final int tryAcquireShared(int unused) {...}
      protected final boolean tryReleaseShared(int unused) {...}
      ...
      protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
      }
    
      final ConditionObject newCondition() {
        return new ConditionObject();
      } 
    }
    

    在分析AQS的时候,主要分析独占锁的实现主要实现tryAcquire和tryRelease,所以这里ReentrantReadWriteLock会详细分析下其实现逻辑。

    同时还记得在分析AQS的时候最后又讲过,关于共享方式锁相关核心知识会放在本章节进行详细分析,所以这里也会重点介绍tryAcquireShared与tryReleaseShared逻辑。

    在具体分析<u>ReentrantReadWriteLock#Sync</u>之前,先回到demo中的读写对象中的lock与unlock逻辑,看看它们最终对应的到底是该Sync中的哪些逻辑。

    WriteLock全貌

    // 写锁
    public static class WriteLock implements Lock, java.io.Serializable {
      private final Sync sync;
    
      protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
      }
    
      public void lock() {
        sync.acquire(1);
      }
      ...
      public void unlock() {
        sync.release(1);
      }
    
      public Condition newCondition() {
        return sync.newCondition();
      }
    
      public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
      }
    
      public int getHoldCount() {
        return sync.getWriteHoldCount();
      }
    }
    

    ReadLock全貌

    public static class ReadLock implements Lock, java.io.Serializable {
      private final Sync sync;
    
      protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
      }
    
      public void lock() {
        sync.acquireShared(1);
      }
      public void unlock() {
        sync.releaseShared(1);
      }
      ...
      public Condition newCondition() {
        throw new UnsupportedOperationException();
      }
    }
    

    对比代码很简单,这里WriteLock与ReadLock的区别就在于lock与unLock逻辑分别调用的是Sync中的 acquire 与 acquireShared及release与releaseShared。因此我们只需要搞清楚了ReentrantReadWriteLock#Sync中对应的实现的同步原理即可。

    3. ReentrantReadWriteLock#Sync核心分析

    从前面的篇幅,已经揭开了ReentrantReadWriteLock的面纱,只需要把其内部实现的AQS逻辑讲清楚,就等于知道了读写锁的本质,那接下来重点分析。

    3.1 tryAcquire(int acquires)

    独占锁获取逻辑

    
    protected final boolean tryAcquire(int acquires) {
      // 当前线程
      Thread current = Thread.currentThread();
      // 当前状态
      int c = getState();
        // 独占数
      int w = exclusiveCount(c);
      // 若state!=0,代表当前独占锁被占用,则需要考虑重入
      if (c != 0) {
        // 若独占数设置为0 或者 当前线程非持有锁线程(无法重入),则返回获取锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
          return false;
        // 如果持有锁线程正式自身,则再判断下独占数是否满足
        if (w + exclusiveCount(acquires) > MAX_COUNT)
          throw new Error("Maximum lock count exceeded");
        // 设置状态,代表获取独占锁成功
        setState(c + acquires);
        return true;
      }
      
      // 到这里,独占锁没有被线程占有
      // writerShouldBlock是针对公平锁而言的
      // 尝试CAS设置状态值
      if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
      
      // 若设置成功,则设置占有标识
      setExclusiveOwnerThread(current);
      return true;
    }
    

    逻辑相对于前面介绍的ReentrantLock而言,基本上多了一些独占数的校验,其他逻辑就是先判断当前锁有没有被线程占有,若有则判断是否可进行重入,若没有则进行CAS设置。逻辑非常简单。

    3.2 tryRelease(int releases)

    独占锁释放逻辑

    protected final boolean tryRelease(int releases) {
      // 判断是否是当前线程独占
      if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
      
      // 释放对应的锁数量(考虑到锁重入)
      int nextc = getState() - releases;
      // 若释放对应的数量后,state==0,则认为完全释放锁
      boolean free = exclusiveCount(nextc) == 0;
      
      // 完全释放了就设置锁独占标识exclusiveOwnerThread = null
      if (free)
        setExclusiveOwnerThread(null);
      setState(nextc);
      return free;
    }
    

    独占锁的释放逻辑也很简单,考虑到重入,每次释放对应的数量,如果释放后state==0,则设置exclusiveOwnerThread = null,代表锁被释放

    接下来重点分析一下共享锁逻辑

    3.3 tryAcquireShared(int unused)

    共享锁(读锁)获取逻辑

    protected final int tryAcquireShared(int unused) {
      // 当前线程
      Thread current = Thread.currentThread();
      // 同步状态
      int c = getState();
      
      // 这个逻辑很重要,判断当前是否有独占锁(写锁)占有资源,如果有的话判断是不是自己
      // 因为写锁排他,所以如果上面条件成立,就获取读锁失败了(这里返回的是int哦)
      if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
      
      // 获取共享锁(读锁)预留数
      int r = sharedCount(c);
      // 1.readerShouldBlock校验:是否需要阻塞(如果是公平锁就判断是不是头节点,如果是非公平就判断头节点是不是共享锁
      // 2.判断读锁预留数是否小于MAX_COUNT
      // 3.CAS获取同步state
      if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果r==0,则代表是第一个线程获取锁,那么设置firstReader及firstReaderHoldCount
        if (r == 0) {
          firstReader = current;
          firstReaderHoldCount = 1;
        } 
        
        // 如果自己已经是firstReader,则直接设置firstReaderHoldCount++
        else if (firstReader == current) {
          firstReaderHoldCount++;
        } 
        
        else {
          
          // HoldCounter是个啥?它记录了当前占有锁的线程ID记录及数量
          // 这里每个获取拥有共享锁的线程都拥有一个HoldCounter来记录自己,这个保存在ThreadLocalHoldCounter里(说白了就是一个ThreadLocal)
          HoldCounter rh = cachedHoldCounter;
          if (rh == null || rh.tid != getThreadId(current))
            cachedHoldCounter = rh = readHolds.get();
          else if (rh.count == 0)
            readHolds.set(rh);
          rh.count++;
        }
        return 1;
      }
      
      // 完整版本的acquire for reads,用于处理tryAcquireShared中未处理的CAS未命中和可重入读取
      // 这段代码与本方法逻辑基本冗余,但注意一下内部是自旋方式尝试获取。
      return fullTryAcquireShared(current);
    }
    

    获取读锁(共享锁)的逻辑也很直白,首先判断当前同步状态是否不写锁占有(独占锁),如果是,则直接获取失败;如果当前没有写锁占有,则经过读锁预留数的一系列校验之后,尝试CAS改变state进行读锁获取,并且设置firstReader及firstReaderHoldCount等属性,同时维护ThreadLocal<HoldCounter> readHolds用来维护每个占有读锁的线程及其占有读锁数(锁重入),这里注意一点,只有当获取锁线程不是firstReader时,才会维护至readHolds中。

    如果上面因为公平特性或者读锁预留数等原因导致读锁获取失败,则尝试执行自旋进行CAS获取。

    所以读锁的获取逻辑还是蛮简单的。

    接下来分析一下读锁释放逻辑

    3.4 tryReleaseShared(int unused)

    共享锁(读锁)释放逻辑

    protected final boolean tryReleaseShared(int unused) {
      // 当前线程
      Thread current = Thread.currentThread();
      // 若是firstReader,则需要改变对应的firstReaderHoldCount
      if (firstReader == current) {
        if (firstReaderHoldCount == 1)
          firstReader = null;
        else
          firstReaderHoldCount--;
      } 
      
      // 若非firstReader,则改变readHolds中维护的HoldCounter
      else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
          readHolds.remove();
          if (count <= 0)
            throw unmatchedUnlockException();
        }
        --rh.count;
      }
      
      // 维护信息变更之后,自旋改变同步状态state
      for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        
        // 若更改后(释放数量)state==0,则返回true,代表彻底释放锁
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
    

    读锁释放逻辑也很简单,就是改变维护的一系列信息之后,尝试改变state。

    引申

    读写锁状态记录时如何维护的

    static final int SHARED_SHIFT   = 16;
    // 左移16位然后减1
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 取同步状态的低16位
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    // 右移16位
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    

    还记得全貌代码里有这个逻辑没有单独介绍吗,其实ReentrantReadWriteLock对于读写锁的记录维护就在这,这里采用32位字节数组进行读写锁的记录,

    上面exclusiveCount(c) 就是将state与EXCLUSIVE_MASK相与sharedCount(int c) 是将同步状态右移16位,因此分为高16位与低16位,

    简而言之就是:

    位图

    高16位用来记录读锁被获取的次数低16位用来记录写锁被获取的次数

    那么到此,ReentrantReadWriteLock的核心原理就介绍完了。

    相关文章

      网友评论

          本文标题:4.ReentrantReadWriteLock核心原理分析

          本文链接:https://www.haomeiwen.com/subject/vweoyktx.html