美文网首页Java并发
Java并发--ReentrantReadWriteLock

Java并发--ReentrantReadWriteLock

作者: 慕北人 | 来源:发表于2020-04-06 19:24 被阅读0次

    对于ReentrantReadWriteLock这个类来说,有几个内部类和成员必须要说,这些成员都是用来记录锁的数量的

    • state:其含义不在单纯是锁的数量了,其高16位代表共享锁的数量低16位代表独占锁的数量
    • final int SHARED_SHIFT:值为16
    • final int SHARED_UNIT:值为2的16次方,主要用来在共享锁成功加锁后state + SHARED_UNIT的操作就代表共享锁的个数加了1
    • final int MAX_COUNT:2的16次方减一,表示共享锁和独占锁最大个数
    • final int EXCLUSIVE_MASK:2的16次方减一,用来作为一个掩码,与state按位相与求出独占锁的个数

    HoldCounter

    static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }  
    

    tid是ThreadId,这个实现是通过Unsafe调用native方法,我们不做深究;只需要记住这个类的对象的作用就是记录某一个持有共享锁的线程以及持有共享锁的个数

    ThreadLocalHoldCounter

    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }  
    

    这个类继承自ThreadLocal,作用就是为每个线程保存住他们的HoldCounter

    • private transient ThreadLocalHoldCounter readHolds
    • private transient HoldCounter cachedHoldCounter;
    • private transient Thread firstReader = null;
    • private transient int firstReaderHoldCount;

    一、公平锁

    在介绍其他之前,我们得先明白两个之后会频繁用到的方法:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }  
    

    从这两个方法的名字我们就能看出,分别是判断读锁和写锁在申请时是否需要阻塞,他们的判断很简单,就是看队列前面是否还有其他在阻塞的锁,如果有,那么需要排在后面

    1. 读共享锁

    读锁是一个共享锁,不同线程可以同时加读锁。

    1. lock()

    public void lock() {
        sync.acquireShared(1);
    }   
    

    可见,其实现就是调用的acquireShared(1),而该方法的实现也是在AQS中的:

      public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
      }  
    

    根据我们的猜测,tryAcquireShared的返回值如果小于零,一定对应的是加锁失败的情况,如果tryAcquireShared返回值大于零,一定对应的是加锁成功的情况。而调用tryAcquireShared方法时传递的也是1:

      protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();  
              
            // 如果当前申请加锁的线程不是独占锁的线程,那么拒绝,返回-1  
            // 也就是说,一旦有人申请了独占锁,那么后续其他线程申请的共享锁一律拒绝
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 获取当前共享锁的个数
            int r = sharedCount(c);
    
            // 对于公平锁来说,readerShouldBlock方法的实现就是return hasQueuedProcessor
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
    
                // 前两个if语句分支用来维护firstReader,及其加锁的次数
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    // rh使用ThreadLocal为每一个线程维护,记录该线程目前持有的读锁的个数
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 专门处理:1. 由于其他线程CAS操作导致本本线程CAS失败导致的加锁失败  2. 由于readerShouldBlock返回true
            return fullTryAcquireShared(current);
        }  
    

    对于该方法实现,我们可以有几个认识:

    1. 如果当前有人持有独占锁(写锁),那么后续只有持有该独占锁的线程才能够再次加锁,对于其他的直接返回-1了,表示加锁失败
    2. 第二个if语句返回true的条件为:该线程前方没有阻塞的节点了,而且CAS操作执行成功成功加锁,那么只要进入了if代码块里最终返回的一定是1,表示加锁成功
      1. if语句里面的代码用来维护firstReader成员和其count;或者是该线程所对应的HoldCount
    3. 如果第二个if返回false,则说明:该线程前方仍有阻塞的节点,或者CAS操作执行失败说明有另一个线程并发修改了state,此时,则会执行fullTryAcquireShared方法

    注意:虽然文章一开头就介绍了HoldCount是记录持有共享锁的线程以及其持有共享锁的个数的,但是通过第二个if语句的处理我们发现firstReader并没有通过HoldCount维护,而是直接使用现有的两个成员维护的

    final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
    
                // 如果当前申请加锁的线程不是独占锁的线程,那么拒绝,返回-1  
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {      // 说明readerShouldBlock
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }  
    

    首先,需要执行该方法的情况有两个:

    1. 在进行共享锁加锁的时候readerShouldBlock()返回了true,在公平锁的情况下该情况表示队列前方还有其他处于阻塞状态的节点
    2. 在加锁过程执行CAS操作修改state时由于并发原因,state值有变动,导致的加锁失败

    由于该方法较长,我们将其分为块内容:

    1. 第一个if语句:该语句处理的情况是目前有线程(称其为独占线程)持有独占锁,如果不是独占线程申请的重入锁的话,则直接加锁失败
    2. 第一个elseif语句:首先能够执行到该分支里的大条件是readerShouldBlock方法返回true,在公平锁的前提下,该方法返回true意味着阻塞队列中前面还有其他被阻塞的线程在排队等待
      1. 第一个if语句:说明是firstReader申请的重入锁,由于是公平锁,所以需要先来后到,那么本次for循环对其不做处理
      2. else语句:该分支处理的情况是非firstReader线程申请共享锁,那么就需要判断其申请的是否是重入锁
        1. 两个if语句:该if语句中,如果得知该线程是首次申请共享锁的话,由于本来队列中就有在等待的节点未处理,那么就不会再批准该线程的申请了,直接返回-1表示加锁失败
    3. 第三个if语句:校验共享锁分发数量是否到达上限
    4. 第四个if语句:直接通过CAS尝试加锁,之后维护加锁成功后需要更新的值,然后返回1表示加锁成功。

    感悟:只看该方法我们可以有一个新的发现,首先如果当前锁已经被独占线程所独占了,那么出来独占线程其他的线程申请共享锁直接返回-1表示失败,这个很容易理解;但是,对于其他情况,就有一个小的区别,如果当前线程申请的是重入锁,那么elseif语句里面我们可以发现对我们当前线程毫无作用,因为第四个if语句有CAS操作可以为其尝试加锁,就算这一次没有成功,由于这个方法是一个自旋,下一个循环我还会可以尝试,除非期间有其他的线程获取了独占锁,否则当前申请的重入锁可以一直尝试加锁直到成功;但是对于申请普通共享锁的线程就不是这么个情况了,因为第一个elseif语句中当readerShouldBlock方法返回true时,首次申请共享锁的线程会在后面返回-1表示加锁失败。因此,我们可以领悟到,原来对于共享锁来说重入共享锁会无限制进行加锁尝试,而普通共享锁则只有在阻塞队列中没有其他阻塞节点时(在公平锁的情况下)才能尝试一次加锁。

    好了,到这里我们对于公平锁加共享锁的情况我们就大概差不多了。无非就是分了三种情况讨论:

    1. 有线程持有独占锁,则只有独占线程可以申请共享锁
    2. 没有线程持有独占锁,则重入锁可以无限制尝试加锁
    3. 没有线程持有独占锁,则普通共享锁可能需要排队获取(在下文的soAcquireShared中)

    Q:如果是独占线程申请普通共享锁,需要排队吗?
    Q:如果是重入性共享锁申请失败,需要加入阻塞队列吗?
    Q:为何首次加锁的线程才需要排队呢?共享锁按理说不互斥的情况下,早晚我都会成功加锁,现在就让我加锁不是一样吗?
    答:还真不一样,如果在队列中有人等待的情况下,还可以使得这些不为于阻塞队列中的、首次申请共享锁的线程进行CAS操作,那么会导致大量的tryAcquireShared方法调用中遇到CAS碰撞导致for自旋,这样带来的坏的影响有两个:1. 大量的for自旋造成了CPU资源的浪费 2. 处于阻塞队列中的、阻塞在共享锁上的节点由于需要挨个挨个唤醒,整个唤醒过程会很慢(只有当前节点获取了锁之后才会唤醒后继节点),以至于对于这些个线程来说会产生饥饿锁,导致他们几乎无法获取锁,从而无法获得执行的机会

    共享锁加锁失败

    好了,我们回到AQS中公平锁申请共享锁的方法实现:

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
      }    
    

    现在我们需要看看,如果tryAcquireShared返回了-1,那么doAcquireShared方法会做什么:

    private void doAcquireShared(int arg) {
        // 将失败的线程封装成一个节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 看到依然是头结点的后继节点会有申请加锁的机会
                    int r = tryAcquireShared(arg);
                    // 如果该节点加锁成功了
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }  
    

    首先,需要执行该方法,都是在线程加锁失败后,需要对其进行排队处理。可见其流程和普通的ReentrantLock一样,别无二致,如果需要阻塞当前线程的话会调用parkAndCheckInterrupt方法挂起线程,唯一需要关心的是当r >= 0,也就是当前线程获取共享锁成功后会调用setHeadAndPropagate方法,而在ReentrantLock中调用的是setHead方法。

      private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
         }
      }
    

    首先,我们可以发现该方法也调用了setHead(node)来将刚刚成功获取共享锁的节点设置为了头结点,接下来,如果等待队列头结点(这里使用头结点而不是node节点是因为并发环境下下一时刻node节点可能就不是头结点了)的下一个节点是null或者也是阻塞在共享锁上,那么就会执行doReleaseShared方法:

    private void doReleaseShared() {
       
        for (;;) {
            Node h = head;
            // 这里对应队列不止一个元素的情况
            if (h != null && h != tail) {
                int ws = h.waitStatus;
    
                // 对于head.waitStatus为SINGLE的情况,会loop知道成功的唤醒了其后继节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
    
                // 对于head.waitStatue为0的情况,会loop知道其状态改为了 传播
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }    
    

    该方法很有意思,首先我们可以通过unparkSuccessor这个方法就能知道,doReleaseShared方法的作用就是用来唤醒挂起的线程的,首先看最后一个if条件,这里是该for循环唯一的出口,如果h == head说明什么呢?我们知道,更改头结点的方法是setHead,如果调用了setHead的话,那么h != null则for循环会继续进行,但是由于setHead方法被调用了,那么此时再进入for循环执行unparkSuccessor方法唤醒的就是另外的线程了,所以每次进入for循环都是一次有用的(因为可能能够唤起一个挂起的线程)循环;如果h == head的话,那么多少次for循环都是无意义的,因为h == head有两种情况:

    1. 本次没有唤醒任何线程,那么再来多少次循环也没有用,因为它就是不会唤醒其他线程了
    2. 本次循环唤醒了线程,只是那个被唤醒的线程还未获得锁,也就是还未执行setHead方法

    我们可以理解,这么写是合理的,因为阻塞队列中的节点是排好队的,那么就应该依次唤醒,也就是一个节点只能被它前一个节点唤醒,所以你只能唤醒一个。

    还有一个有意思的地方时,h != head时说明执行了setHead方法,而setHead方法的执行只有在某一个阻塞的节点成功获取了锁成为头结点的时候才会调用,那么此时虽然doReleaseShared方法不会返回,但是此时的Head已经更新了,之后再唤醒线程的话,是唤醒的新的线程。所以这一系列操作,保证了队列中一段连续的、阻塞在共享锁上的线程被挨个唤醒,从而他们能够获得共享锁。这也是保证了共享锁共享的特性。

    对于上面提到的两个问题,没有看到有针对性的处理,所以暂时有以下回答:

    1. 对于独占线程来说,如果它也是第一次申请共享锁的话,在公平锁的情况下,其在必要时也需要排队
    2. 对于申请重入性共享锁的线程,加锁失败后会被添加到阻塞队列

    2. unLock()

    直接看该方法的实现:

    public void unlock() {
        sync.releaseShared(1);
    }  
    

    releaseShared方法的实现是在AQS中:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
              doReleaseShared();
            return true;
        }
        return false;
    }  
    

    首先我们的感觉,共享锁是非互斥的,那么你释放应该不会对其他的锁造成影响,这只是我的猜测,我们来看看tryReleaseShared方法:

    protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }  
    

    很简单,就像我们猜测的那样,共享锁的释放并不会影响到其他的线程,tryReleaseShared方法中只是为了维护该线程对应的HoldCounter对象,我们一开头就说了,这个对象记录了某一个线程所持有的共享锁的个数。注意,该方法返回true时,说明该线程已经释放了所持有的最后一个共享锁

    在tryReleaseShared方法执行成功完毕后,会执行doReleaseShared方法,这个方法我们在分析加锁的时候已经说过了。

    刚开始没有反应过来,按理共享锁的释放不会影响其他共享锁的获取啊,因为共享锁顾名思义不是互斥的啊,为何这里会执行doReleaseShared方法取尝试唤醒其他的线程?哈哈,按理说确实是这样,但是我们前面少说了一个特殊情况,那就是所有线程所持有的贡享锁的数量达到了上限,那么之后再想获取共享锁,只能被阻塞,对于这种情况,此时有人释放了锁,自然需要去主动唤醒一个阻塞在共享锁上的线程。

    2. 写独占锁

    1. lock()

    上面我们分析完了公平锁情况下,申请共享锁的情况,接下来我们看看申请独占锁,也就是写锁的情况:

    public void lock() {
        sync.acquire(1);
    }  
    

    从调用的形式来看,该方法和ReentrantLock是一样的,同样,acquire方法的实现在AQS中:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }  
    

    那么不同之处应该就在于各种回调方法的实现上了:

    protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }  
    

    我们依然将该方法拆开来看:

    1. 第一个if语句,如果这个语句为true的话,说明目前有线程持有锁,可以是任何锁,共享锁或者独占锁
      1. 第一个if语句:这里的情况是有人持有共享锁或者其他某个线程持有独占锁,这两种情况下,都会加锁失败,直接返回false
      2. 第二个if语句:这个if语句的情况是目前独占锁持有数量已达上限
      3. 在经过前面两个if语句多没有抛出异常或者返回false的情况下,说明是申请的重入性的独占锁,则无条件加锁成功
    2. 第二个if语句:如果能够执行到这里,说明当前c为0,也就是没有任何线程持有锁,那么此时申请独占锁就需要看看是否应该排队;而在公平锁的情况下,writerShouldBlock方法返回true说明队列中有人排在该线程之前,需要排队

    从这个方法的分析,我们可以知道在没有人持有锁的情况下,此时申请加锁,独占锁并不会有更高的优先级,也就是说如果队列前面有一个共享锁的申请在阻塞中,那么独占锁并不会插在共享锁的申请之前。

    独占锁加锁失败

    加锁失败的情况,调用acquireQueued方法,该方法的实现位于AQS中,与ReentrantLock的情况一致,这里并不做过多解释。

    2. unlock()

    public void unlock() {
        sync.release(1);
    }
    

    该方法同ReentrantLock中的一样也是在AQS中,我们来看看其方法实现:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }  
    

    我们首先看看tryRelease方法的实现:

    protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }  
    

    大体的思路和ReentrantLock中的一样,只不过这里记录独占锁的个数的变量不再是state,而是exclusiveCount:

    1. 首先检验当前释放独占锁的线程是不是真的持有独占锁
    2. 检验释放后独占锁还剩多少,更新state
    3. 如果为0则解绑独占线程,最终会返回是否还有独占锁

    好了,如果tryRelease方法返回true说明所有的独占锁都已经被释放了,那么此时就会执行unparkSuccessor方法唤醒等待队列中下一个节点

    二、非公平锁

    非公平锁与公平锁的区别就在于两个方法:

    static final class NonfairSync extends Sync {
        
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }  
    

    可见首先对于独占锁来说,writerShouldBolck永远返回false,这就会在某一个线程申请独占锁的时候产生非公平性。当持有独占锁的线程A刚刚执行了unlock方法时,此时其时间片仍未执行完,但是此时它再次申请独占锁,那么由于writeShouldBlock方法返回false,则它将直接加锁成功,而不必去理会此时是否有线程在队列中排在其之前

    我们再来看看共享锁的情况,这里是回调的apparentlyFirstQueuedIsExclusive方法

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }  
    

    我们只需要理会其何时返回true即可,我们可以认为有三个条件都满足时返回true说明申请共享锁的线程需要排队:

    1. 头结点不为null
    2. 头结点后继节点不为null
    3. 头结点的后继节点不是申请的共享锁(一般就是独占锁了)

    可见,这里还是照顾了一下有人申请独占锁的情况。我们回忆一下,在申请共享锁的时候,只有队列不为空而且线程是申请的普通共享锁的情况下才会查看是否需要排队的,所以这里处理了一下有人是独占锁的情况。

    在公平锁的情况下,只要队列不为空,那么都会往后进行排队,但是非公平锁的情况是,如果头结点的后继节点是一个阻塞的独占锁申请的话,你才需要去排队,否则的话,你可以直接加塞

    相关文章

      网友评论

        本文标题:Java并发--ReentrantReadWriteLock

        本文链接:https://www.haomeiwen.com/subject/rpfiphtx.html