美文网首页android
010 ReentrantReadWriteLock 读写锁 |

010 ReentrantReadWriteLock 读写锁 |

作者: __destory__ | 来源:发表于2019-03-26 18:10 被阅读17次

    所谓读写锁,即允许读线程之前同时访问(共享锁),读和写,以及写和写线程之间不能同时访问(排它锁)。JDK提供了ReentrantReadWriteLock实现读写线程的控制,可重入。

    private Map<String, Object> map = new HashMap<>();
    private ReadWriteLock rwl = new ReentrantReadWriteLock();
    private Lock r = rwl.readLock();
    private Lock w = rwl.writeLock();
    public Object get(String key) {
        r.lock();
        System.out.println(Thread.currentThread().getName() + " 读操作在执行..");
        try {
            return map.get(key);
        } finally {
            r.unlock();
            System.out.println(Thread.currentThread().getName() + " 读操执行完毕..");
        }
    }
    
    public void put(String key, Object value) {
        w.lock();
        System.out.println(Thread.currentThread().getName() + " 写操作在执行..");
        try {
            map.put(key, value);
        } finally {
            w.unlock();
            System.out.println(Thread.currentThread().getName() + " 写操作执行完毕..");
        }
    }
    
    

    那么,接下来,我们分析下ReentrantReadWriteLock 源代码,背后其实也借助了AQS,


    ReentrantReadWriteLock代码内部情况

    其中,NonfairSync和FairSync是Sync类的两种实现,分别定义了非公平的sync和公平的sync,其功能和Reentrantlock的一样,只是判断的方式不同。

    另外,定义了ReadLock和WriteLock两个内部类,毕竟读和写的逻辑不同,用两个不同的类分开实现功能,二者实现接口Lock,

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    

    但是本质上,每个方法都是借助sync对象完成操作,我们重点关注如下四个方法,

    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    
        public void lock() {
            sync.acquireShared(1);
        }
    
        public void unlock() {
            sync.releaseShared(1);
        }
    }
    
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    
        public void lock() {
            sync.acquire(1);
        }
    
        public void unlock() {
            sync.release(1);
        }
    }
    

    首先分析writelock的获锁和释放锁过程

    写锁的acquire和release

    和Reentrantlock的源码过程一致,调用sync的acquire和release方法,而这两个方法,会调用sync实现者的tryAcquire和tryRelease方法,


    Sync实现类的方法预览

    写锁是一个互斥的可重入锁,这点和Reentrantlock锁一样,因此,实现方式相同,获得锁的时候,如果没有get到,则进入阻塞队列。在tryAcquire中,如果目前没有锁,需要去竞争,此时,根据公平和非公平,对于写锁,有不同的判断,通过NonfairSync类和FairSync类的定义可以看到,

    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    
    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    

    对于,writerShouldBlock方法而言,在NonfairSync 直接返回false,不需要阻塞写线程,而在FairSync中,需要根据hasQueuedPredecessors函数返回值来判断是否阻塞写线程,hasQueuedPredecessors在之前的文章中介绍过,当前线程不为空,并且阻塞的线程不是当前线程,返回true,即需要阻塞,反之不需要阻塞。

    接下来,看下,tryAcquire方法(不能无限循环,一次性给出false还是true,阻塞线程由aqs的acquire方法进行)

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread(); //获得当前线程
        int c = getState(); //拿到aqs的int state
        int w = exclusiveCount(c); //获得排他锁的占用个数,
        if (c != 0) {  //当前有锁存在,无论是读锁,还是写锁
            if (w == 0 || current != getExclusiveOwnerThread()) //没有写锁,即,有读锁,或者,不是当前正在获得锁的线程,
                //返回false,阻塞当前线程。
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires); //否则,有写锁并且当前线程就是锁的持有这,可重入,更新aqs的state
            return true;
        }
        if (writerShouldBlock() || // 如果当前没有任何锁,则需要公平锁和非公平锁来判断是否阻塞线程
            !compareAndSetState(c, c + acquires))
            return false; //如果需要阻塞或者不需要阻塞但是cas加锁失败,则返回false,有aqs加入到队列并循环等待
        setExclusiveOwnerThread(current); //如果不需要阻塞并且也cas成功加到锁,则这里设置写锁持有线程。
        return true;
    }
    

    这里,使用了一个int state,来表示两个不同的锁,对应的加锁次数,怎么做到?将int的字节分成高16和低16位,分别表示,读锁和写锁,那么针对这个state的获取读和写锁不同的部分,就是bit操作了

    state的分块使用
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } //从state中获得读锁个数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//从state中获得写锁个数
    

    看下,tryRelease方法(不能无限循环,一次性给出false还是true,唤醒其他线程由aqs的release进行),

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively()) //非锁持有线程不能做,当前写锁只有一个线程获得(互斥锁),没有共享问题
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null); // 清空写锁的持有线程
        setState(nextc);
        return free;
    }
    

    读锁的acquire和release

    从上一节的过程看到,读锁在判断是否阻塞的时候,非公平的NofairSync,出现新的判断机制,参考方法,apparentlyFirstQueuedIsExclusive()

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null && // 有头
            (s = h.next)  != null && // 且有尾,
            !s.isShared()         && //第一个节点不是共享线程,即,是写线程,即,有写线程阻塞着呢
            s.thread != null; //线程存在
    }
    

    言外之意,虽然是非公平锁,读线程可以随意加锁不阻塞,但是呢,当有写线程正在阻塞的时候呢,读线程还是等等吧,先别读,直接阻塞进入队列,等待阻塞的写线程执行完毕(被唤醒后执行),在被唤醒执行。

    OK,接下来分析读锁的lock和unlock过程,对应sync方法的acquireShared和releaseShared方法。这里,同aqs的acquire和release过程一样,具体细节如下,

    // aqs的acquireShared方法
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) 
            //调用实现者的tryAcquireShared方法,如果小于零,1.表示需要阻塞读线程,则调用doAcquireShared加入到阻塞队列。
            doAcquireShared(arg);
    }
    //和acquire基本相同
    //2. 因为要阻塞,所以要通过addWaiter加入到队列的尾部
    //3. 进入循环,直接队列中前一个节点是head,即,被中断唤醒,需要执行,调整队列,head后移
    //4. 循环过程中,前一个节点不是head,则继续阻塞,同时,梳理下队列。
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); // 加入到队列的尾部,注意shared状态节点
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //不断判断前一个节点
                if (p == head) { // 如果是head,当前节点对应的线程被唤醒,
                    int r = tryAcquireShared(arg); // 获得一个读锁。
                    if (r >= 0) { // 如果获得成功
                        setHeadAndPropagate(node, r); // 设置队列的head,同时,propagate队列
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt(); //自我中断
                        failed = false;
                        return;
                    }
                }
    
                   //在shouldParkAfterFailedAcquire中继续查看前一个节点的状态
                   //shouldParkAfterFailedAcquire如果前一个节点不是signal且无效状态的话清理返回false,如果有
                   //如果有效状态,将其设置为signal等待下次循环中被调用,如果前一个节点是signal,返回true
                if (shouldParkAfterFailedAcquire(p, node) &&  // 返回true,则需要进行阻塞
                    parkAndCheckInterrupt()) // 如果需要阻塞,则调用LockSupport进行阻塞(线程停止运行,等待中断后,再继续循环)
                    interrupted = true;
                   //被中断唤醒, 虽然被唤醒,但是前一个节点是head的才能有机会获得锁,
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //如果执行到这,说明唤醒了一个读线程,此时,是因为之前有写线程阻塞,为了优先写线程执行,读线程
    //被阻塞,待到写线程被唤醒后执行完毕,这里的读线程被唤醒,而该读线程后面,还有更多等待的读线程,
    //都可以被唤醒,所以这叫做propagete传播唤醒。
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);
        // propagate也就是state的更新值大于0,代表可以继续acquire
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 判断后继节点是否存在,如果存在 且是共享锁,即,读线程。
            // 然后进行共享模式的释放
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    /**
    一直循环,直到队列为空,做一件事情,如果节点是signal,则cas设置状态为0,成功后,唤醒节点对应
    线程,否则,如果节点状态是0,表示位置状态,cas设置为Propagate状态(-3),成功后,继续下一次循环
    **/
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { //可以唤醒,
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //设置状态为0
                        continue;           
                    unparkSuccessor(h);  // 中断阻塞的线程 (唤醒)
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 调整节点的状态
                    continue;             
            }
            if (h == head)            // 队列为空的时候退出循环,如果不为空,则一直在propagation
                break;
        }
    }
    

    doReleaseShared的作用,就是找到队列中状态为signal的节点,并将其唤醒,不进行队列的调整,唤醒后,状态变成0,之后又变成-3。所以经过doReleaseShared之后,原本为signal的线程被唤醒,队列中所有节点状态变成-3,队列长队没有变。

    shouldParkAfterFailedAcquire在判断当前节点前节点的状态,如果不是signal且有效(0,或者-3),则设置为signal,因此,即使doReleaseShared将节点设置了-3,shouldParkAfterFailedAcquire也会将其设置为-1 signal。

    队列的长度发生变化,在于parkAndCheckInterrupt()函数被中断唤醒后,此时,多个线程可能被唤醒,都在此函数后继续执行,参考,acquireShared函数,虽然唤醒了多个线程,但是,只有前置节点是head节点,才能获取锁后,调整队列长度,其他线程可能又会进入到阻塞,不过没关系,shouldParkAfterFailedAcquire进会将前一个节点设置为signal,因此,doReleaseShared中,依旧会唤醒线程。

    例如,四个阻塞节点

    • -1,-1,-1,-1 (初始,都阻塞)
    • -3,-3,-3,-3 (doReleaseShared全部是唤醒,且状态都变成-3,也可以还有0,)
    • -1,-1,-1 (第一个节点成功获得锁,调整队列,其他线程循环后,shouldParkAfterFailedAcquire将-3变成了-1,后,阻塞自己)
    • 回到第一步,如此反复执行,直到队列为空。

    接下来,看下releaseShared方法

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    可以看到使用了doReleaseShared,批量唤醒多个等待的读线程(也可能有写线程)。这里也会调用aqs的实现类的tryReleaseShared方法。

    最后,统一看下aqs实现类读锁的tryAcquireShared和tryReleaseShared方法,

    
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current) //如果有写锁,并且写锁的线程持有者不是当前线程
            return -1; //则返回-1,获得锁失败
        int r = sharedCount(c); //获取读锁的个数
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
            //如果不需要阻塞读线程,且,成功设置了state中读锁的个数位,表明获得到了读锁。
            if (r == 0) { //单独考虑第一个获得读锁的线程,程序执行效率优化目的
                firstReader = current;
                firstReaderHoldCount = 1; // 效率优化
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter; 
                //cachedHoldCounter效率优化,缓存上一次的HoldCounter 
    
                if (rh == null || rh.tid != getThreadId(current)) 
                  //如果上次缓存的holder不是当前线程,则冲LocalThread(readHolds封装了LocalThread)中重新获取。
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)  //如果是自己的,当时count=0,则再次加入到LocalThread中,
                    readHolds.set(rh); //这里为何要重放一次???。
                rh.count++; //累加计数
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    

    tryAcquireShared只有一次if,对于阻塞或者获取锁失败的读线程如何处理,参考fullTryAcquireShared。

    /**
    循环的在做tryAcquireShared相同的工作。
    */
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1; //有写锁,且当前线程不是写锁的持有者
            } else if (readerShouldBlock()) { //当度处理,如果是需要阻塞线程怎么办
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter; //效率优化
                        if (rh == null || rh.tid != getThreadId(current)) { //不是自己的HoldCounter
                            rh = readHolds.get(); //拿出自己的,
                            if (rh.count == 0) //之前没有过任何重入,则,删除ThreadLocal中的记录
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)  //返回阻塞
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    
            //执行到这,说明,没有写线程,且当前读线程不能阻塞(之前),要继续竞争锁,
            //tryAcquireShared中只进行一次获得锁,只能一个线程成功,其他线程在这里继续竞争。
            if (compareAndSetState(c, c + SHARED_UNIT)) {//获得到读锁
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1; //效率优化
                } else if (firstReader == current) {
                    firstReaderHoldCount++; //效率优化
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;  //效率优化
                    if (rh == null || rh.tid != getThreadId(current)) // 不是自己的Holder
                        rh = readHolds.get(); //拿自己的
                    else if (rh.count == 0) //
                        readHolds.set(rh); //为何重放?难道是因为上面的remove?
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    

    fullTryAcquireShared作用,其实就让多个读线程,cas更新state中读锁的个数。接下来,看下

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
                  //调整holdcounter,如果重入次数减为1,则删除ThreadLocal中的记录。
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) { //cas更新读锁的个数
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0; //当最后一个读线程释放锁,返回true,其余返回false。
        }
    }
    

    相关文章

      网友评论

        本文标题:010 ReentrantReadWriteLock 读写锁 |

        本文链接:https://www.haomeiwen.com/subject/ugjgvqtx.html