美文网首页一些收藏收藏
ReentrantReadWriteLock之一

ReentrantReadWriteLock之一

作者: 程序员札记 | 来源:发表于2022-04-01 21:32 被阅读0次

    ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何“读/读”、“读/写”、“写/写”操作都不能同时发生,这在一定程度上降低了吞吐量。然而读操作之间不存在数据竞争问题,如果"读/读"操作能够以共享锁的方式进行,那会进一步提升性能。因此引入了ReentrantReadWriteLock,顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。

    读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能,读写锁主要应用于读多写少的场景。

    一般情况下,读写锁的性能都会比排它锁好,因为大多数场景是读多于写的。在读多写少的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量,读写锁主要具有以下特性:

    而读写锁有以下三个重要的特性:
    (1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
    (2)重进入:读锁和写锁都支持线程重进入。
    (3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
    读写锁中同样依赖队列同步器Sync(AQS)实现同步功能,而读写状态就是其同步器的同步状态。下面从例子中来说明:读读共享,读写互斥,写写互斥。

    共享 互斥
    同线程共享,不同线程互斥 互斥

    下面是个cache 的例子

    package com.concurreny.aqs;
    
    import java.util.Map;
    import java.util.TreeMap;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
    
    import javax.xml.crypto.Data;
    //读写锁ReentrantReadWriteLock:读读共享,读写互斥,写写互斥!
    public class ReentrantWriteReadLockTest {
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        ReadLock readLock = lock.readLock();
        WriteLock writeLock = lock.writeLock();
        
        public void read(){
            try {
                readLock.lock();
                System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
                Thread.sleep(3000);
                System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                readLock.unlock();
            }
        }
        
        public void write(){
            try {
                writeLock.lock();
                System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
                Thread.sleep(3000);
                System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                writeLock.unlock();
            }
        }
        
     
        public static void main(String[] args) {
            final ReentrantWriteReadLockTest wr = new ReentrantWriteReadLockTest();
            Thread t1 = new Thread(new Runnable() {
                public void run() {
                    wr.read();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                public void run() {
                    wr.read();
                }
            }, "t2");
            Thread t3 = new Thread(new Runnable() {
                public void run() {
                    wr.write();
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                public void run() {
                    wr.write();
                }
            }, "t4");
            
            t1.start();
            t2.start();
            //t3.start();
            //t4.start();
        }
    
    }
    class RWDictionary {
        private final Map<String, Data> m = new TreeMap<String, Data>();
        private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private final Lock r = rwl.readLock();
        private final Lock w = rwl.writeLock();
    
        public Data get(String key) {
            r.lock();
            try { return m.get(key); }
            finally { r.unlock(); }
        }
        public String[] allKeys() {
            r.lock();
            try { return (String[]) m.keySet().toArray(); }
            finally { r.unlock(); }
        }
        public Data put(String key, Data value) {
            w.lock();
            try { return m.put(key, value); }
            finally { w.unlock(); }
        }
        public void clear() {
            w.lock();
            try { m.clear(); }
            finally { w.unlock(); }
        }
     }
    
    

    源码解读

    image.png

    我们先来看下 ReentrantReadWriteLock 类的整体结构:

        /** 读锁 */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** 写锁 */
        private final ReentrantReadWriteLock.WriteLock writerLock;
        final Sync sync;    
        /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
        public ReentrantReadWriteLock() {
            this(false);
        }
        /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
        /** 返回用于写入操作的锁 */
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }    
        /** 返回用于读取操作的锁 */
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
        abstract static class Sync extends AbstractQueuedSynchronizer {}
        static final class NonfairSync extends Sync {}
        static final class FairSync extends Sync {}
        public static class ReadLock implements Lock, java.io.Serializable {}
        public static class WriteLock implements Lock, java.io.Serializable {}
    
    

    类的继承关系

    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
    说明:可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。

    类的内部类

    ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。内部类的关系如下图所示。


    image.png

    说明:如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口。

    Sync类:
    (1)类的继承关系
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    说明:Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持。
    (2)类的内部类
    Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,其中,HoldCounter源码如下。

    // 计数器
    static final class HoldCounter {
        // 计数
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        // 获取当前线程的TID属性的值
        final long tid = getThreadId(Thread.currentThread());
    }
    

    说明:HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下

    // 本地线程计数器
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    

    说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

    (3)类的属性

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 版本序列号
        private static final long serialVersionUID = 6317671515068378041L;        
        // 高16位为读锁,低16位为写锁
        static final int SHARED_SHIFT   = 16;
        // 读锁单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 读锁最大数量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写锁最大数量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        // 本地线程计数器
        private transient ThreadLocalHoldCounter readHolds;
        // 缓存的计数器
        private transient HoldCounter cachedHoldCounter;
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;
    }
    

    说明:该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。
    (4)类的构造函数

    // 构造函数
    Sync() {
        // 本地线程计数器
        readHolds = new ThreadLocalHoldCounter();
        // 设置AQS的状态
        setState(getState()); // ensures visibility of readHolds
    }
    

    说明:在Sync的构造函数中设置了本地线程计数器和AQS的状态state。

    读写状态的设计

    同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
    读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。

    image.png

    假设当前同步状态值为S,get和set的操作如下:
    (1)获取写状态:
    S&0x0000FFFF:将高16位全部抹去
    (2)获取读状态:
    S>>>16:无符号补0,右移16位
    (3)写状态加1:
    S+1
    (4)读状态加1:
      S+(1<<16)即S + 0x00010000
    在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。

    写锁的获取与释放

    看下WriteLock类中的lock和unlock方法:

    public void lock() {
        sync.acquire(1);
    }
     
    public void unlock() {
        sync.release(1);
    }
    

    可以看到就是调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
    写锁的获取,看下tryAcquire:

    protected final boolean tryAcquire(int acquires) {
        //当前线程
        Thread current = Thread.currentThread();
        //获取状态
        int c = getState();
        //写线程数量(即获取独占锁的重入数)
        int w = exclusiveCount(c);
        
        //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
        if (c != 0) {
            // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
            // 如果写锁状态不为0且写锁没有被当前线程持有返回false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;        
            //判断同一线程获取写锁是否超过最大次数(65535),支持可重入
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //更新状态
            //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
            setState(c + acquires);
            return true;
        }    
        //到这里说明此时c=0,读锁和写锁都没有被获取
        //writerShouldBlock表示是否阻塞
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;    
        //设置锁为当前线程所有
        setExclusiveOwnerThread(current);
        return true;
    }
    

    其中exclusiveCount方法表示占有写锁的线程数量,源码如下:
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
    从源代码可以看出,获取写锁的步骤如下:
    (1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
    (2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
    (3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
    (4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
    (5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
    方法流程图如下:


    image.png

    相关文章

      网友评论

        本文标题:ReentrantReadWriteLock之一

        本文链接:https://www.haomeiwen.com/subject/bluajrtx.html