美文网首页
ReadWriteLock

ReadWriteLock

作者: 闲来也无事 | 来源:发表于2020-11-15 21:49 被阅读0次

    2、 ReadWriteLock 接口

    ReentrantReadWriteLock是ReentrentLock接口实现之一,具有以下特性:

    1. 提供了非公平模式(默认)和公平模式。
    2. 支持重入。
    3. 支持锁降级。写锁可降级为读锁,但是读锁不可升级为写锁。
    4. 支持中断。读锁和写锁都支持锁获取期间的中断。
    5. 支持Condition。

    ReadWriteLock提供了一对锁,读锁和写锁。其中读锁是共享锁,同一时间可以有多个线程访问;写锁是独占锁(排它锁),同一时间只能有一个线程访问。相对于可重入锁,读写锁通过读写分离、读锁可共享的方式进一步提高了并发性能。

    读写锁简单示例:

    /**
     * 使用ReentrantReadWriteLock实现线程安全的HashMap
     */
    public class ReadWriteLockDemo {
    
        // 读写锁实例
        private ReadWriteLock lock = new ReentrantReadWriteLock();
        // 读锁
        private Lock rLock = lock.readLock();
        // 写锁
        private Lock wLock = lock.writeLock();
        // 共享变量
        private Map<String, String> map = new HashMap<>();
        
        // 写
        public void put(String key, String value) {
            wLock.lock();
            try {
                map.put(key, value);
            } finally {
                wLock.unlock();
            }
        }
        
        // 读
        public void get(String key) {
            rLock.lock();
            try {
                map.get(key);
            } finally {
                rLock.unlock();
            }
        }
    }
    

    ReentrantReadWriteLock源码结构整体上与ReentrantLock相似,部分源码:

    // 同步器
    abstract static class Sync extends AbstractQueuedSynchronizer{}
    // 非公平锁同步器
    static final class NonfairSync extends Sync{}
    // 公平锁同步器
    static final class FairSync extends Sync{}
    
    // 读锁
    public static class ReadLock implements Lock, java.io.Serializable{}
    // 写锁
    public static class WriteLock implements Lock, java.io.Serializable{}
    

    关于AbstractQueuedSynchronizer前文已有介绍,这里重点要介绍一下state变量。前文在分析ReentrantLock的时候介绍过state记录了同步状态。那么对于读写锁呢?如何用一个变量同时记录读状态和写状态呢?

    答案是将state变量的高16位和低16位拆分,高16位记录读状态,低16位记录写状态即可。当然这种设计模式也决定了ReentrantReadWriteLock最多提供65535个重入写锁、65535个读锁。ReentrantReadWriteLock使用了位运算来对state变量进行加一、减一的操作。

    state值操作示例:

    // 获取写状态值
    state & 0x0000FFFF
    // 获取读状态值
    state >>> 16
    
    // 写状态值加1
    state + 1
    // 读状态值加1
    state << 16
    
    // 写状态值减1
    state - 1
    // 读状态值减1
    state - (1 << 16)
    

    2.1 锁降级

    锁降级:对于一个线程T,先获取写锁,再获取读锁,最后释放写锁。

    JDK官方示例:

    /**
     * ReentrantReadWriteLock类注释中的锁降级示例
     */
    public class CachedData {
        Object data;
        volatile boolean cacheValid;
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        void processCachedData() {
            rwl.readLock().lock();
            if (!cacheValid) {
                // Must release read lock before acquiring write lock
                // 在获取写锁之前必须释放读锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    // Recheck state because another thread might have
                    // acquired write lock and changed state before we did.
                    // 重新检查状态,因为其他线程可能在我们之前已经写锁并更改了状态。
                    if (!cacheValid) {
                        // 修改data(写操作)
                        data = ...
                        cacheValid = true;
                    }
                    // Downgrade by acquiring read lock before releasing write lock
                    // ① 通过在释放写锁之前获取读锁来降级
                    rwl.readLock().lock();
                } finally {
                    // Unlock write, still hold read
                    // 释放写锁,持有读锁
                    rwl.writeLock().unlock(); 
                }
            }
            try {
                // 使用data(读操作)
                use(data);
            } finally {
                // 释放降级读锁
                rwl.readLock().unlock();
            }
        }
    }
    

    processCachedData方法通过cacheValid变量值兼顾了读锁和写锁的功能:若cacheValid为true,为读锁;若cacheValid为false为写锁(兼读锁)。

    1. 为什么要进行锁降级
      可重入读写锁可以说是对可重入锁的一种改进,其目的之一就是为了提高并发性能。假如因为use(data)方法耗时较长而导致写锁不释放,那么其他的读锁将无法使用data。所以使用锁降级,也能够有效的提高并发性能。
    2. 为什么必须在释放写锁之前获取读锁(对应代码标记①)
      1. 假线程T1没有获取读锁,直接释放写锁,在调用use(data)方法前,线程T2获取了写锁并修改了data,随后T1开始调用use(data)方法,那么线程T1就会读取到脏数据,所以这一步是必须的,其目的就是为了解决数据可见性的问题。
      2. 如果线程T1先获取读锁,再释放写锁。那么线程T2将会在获取写锁时阻塞,待T1释放了读锁之后,T2才能获取写锁并操作data,从而保证了数据可见性。

    接下来开始分析读写锁的源码,注意,以下代码若无特别声明,均为非公平模式。

    2.2 读锁

    结合上文的示例,开始分析读锁的加锁、解锁过程。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    1. 调用tryAcquireShared尝试直接获取读锁。
    2. 若未能获取到读锁,调用doAcquireShared将当前现场构造成Node节点,入队阻塞,直至获取到读锁。
    2.2.1 tryAcquireShared
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // ①
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1;
        // ②
        int r = sharedCount(c);
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // ③
        return fullTryAcquireShared(current);
    }
    

    变量解释:

    • firstReader,记录第一个获取读锁的线程。
    • firstReaderHoldCount,记录第一个获取读锁的线程的重入次数。
    • cachedHoldCounter,成功获取readLock的最后一个线程的持有计数。在通常情况下,下一个要释放的线程是最后一个要获取的线程,这样可以节省ThreadLocal查找。
    • readHolds,当前线程持有的可重入读锁的数量。继承自ThreadLocal类。持有HoldCounter,记录了线程id、线程数。

    tryAcquireShared分为3步:

    1. 持有写锁的线程非当前线程。返回-1,即失败。从这里也可以看出,同一个线程持有写锁的同时,可以再次获取读锁。
    2. 如果读锁不应被阻塞,且读锁未饱和,且成功更新了同步状态,接下来还要做三个判断:
      1. 读锁数为0,设置firstReader和firstReaderHoldCount,用来记录第一个获取读锁的线程及其重入次数。
      2. firstReader与当前线程相同,代表读锁重入,将firstReaderHoldCount加1。
      3. 上述两个条件均不满足,说明已经有其他线程获取读锁。则将当前线程缓存至readHolds(该变量为ThreadLocal类型)
    3. 上述两步都未满足,执行获取读锁的“全量版本”。

    需要详细解释一下readerShouldBlock()方法:对于非公平锁,其目的是为了防止获取写锁的线程饥饿;而对于公平锁,其目的是为保证线程获取锁的公平性。

    可以通过下面的代码来测试:

    /**
     * 测试readerShouldBlock()方法
     */
    public class ReaderShouldBlock {
        static ReadWriteLock rwl = new ReentrantReadWriteLock();
        
        public static void main(String[] args) {
        
            // T1
            new Thread(() -> {
                rwl.writeLock().lock();
                // ①
                rwl.readLock().lock();
                try {
                    
                } finally {
                    rwl.readLock().unlock();
                    rwl.writeLock().unlock();
                }
            }).start();
            
            // T2
            new Thread(() -> {
                rwl.writeLock().lock();
                try {
                } finally {
                    rwl.writeLock().unlock();
                }
            }).start();
        }
    }
    

    测试的时候,需要在代码①处和readerShouldBlock()方法同时断点,则readerShouldBlock()会返回true。对于非公平锁:判断AQS队列中是否有写锁等待获取线程;对于公平锁:判断AQS中是否有其他线程比当前线程更早的获取锁。


    以全量版本获取读锁:

    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            // ①
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            }
            // ② 
            else if (readerShouldBlock()) {
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            // ③
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // ④
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    

    fullTryAcquireShared通过”CAS“再次获取锁:

    1. 其他线程持有写锁,
      1. 当前线程非持有写锁的线程,返回-1,即失败。
      2. 当前线程是持有写锁的线程,阻塞,死锁。
    2. 需要阻塞读锁。这里阻塞的前提我认为应该是只发生在公平锁的情况下,不知道对不对。
    3. 读锁饱和,抛出”Maximum lock count exceeded“错误
    4. 获取锁成功。
    2.2.2 doAcquireShared

    如果tryAcquireShared方法未能获取到读锁,则通过doAcquireShared方法将当前线程构造成Node节点,入队并阻塞,忽略中断,直至获取到同步状态。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    该过程与Lock接口的逻辑相似,且较简单,可参考前文。

    2.3 写锁

    // ReentrentReadWriteLock
    public void lock() {
        sync.acquire(1);
    }
    
    // AQS
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    与Lock接口的实现类ReentrentLock一样,ReentrentReadWriteLock类也是先调用了AQS的acquire方法。但是具体的实现细节肯定是不尽相同的。

    2.3.1 tryAcquire
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCou****nt(c);
        // ①
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        // ②
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
            return false;
        // ③    
        setExclusiveOwnerThread(current);
        return true;
    }
    
    1. 同步状态不为0,说明已经有线程获取到锁(可能是读锁,也可能是写锁)
      1. 写锁数为0,则说明其他线程持有读锁,失败;写锁数不为0但持有锁的线程非当前线程,失败。
      2. 写锁数饱和,抛出“Maximum lock count exceeded”错误。
      3. 上述条件均不满足,则说明当前写锁为重入锁,更新同步状态。
    2. 同步状态为0
      1. 获取写锁的线程是否应被阻塞,对于非公平锁来说,总是返回true。该方法是实现公平锁和非公平锁的关键。其依然是通过hasQueuedPredecessors()方法来判断,可参考前文Lock接口。
      2. CAS设置同步状态失败。失败。
    3. 获取同步状态成功。
    2.3.2 acquireQueued

    acquireQueued方法与ReentrentLock一致,参考前文即可。

    2.4 其他

    关于tryLock、lockInterruptibly等方法与前文重入锁分析相似,不再赘述。

    相关文章

      网友评论

          本文标题:ReadWriteLock

          本文链接:https://www.haomeiwen.com/subject/ogeobktx.html