美文网首页Java
ReentranReadWriteLock源码浅析

ReentranReadWriteLock源码浅析

作者: 小金狗子 | 来源:发表于2020-01-21 14:52 被阅读0次

    本文为作者个人理解难免有误,欢迎指正

    请先阅读ReentranLock源码浅析

    ReentranReadWriteLock 的基本构成

      private final ReentrantReadWriteLock.ReadLock readerLock;
        private final ReentrantReadWriteLock.WriteLock writerLock;
        final Sync sync;
        public static class ReadLock
        public static class WriteLock
    

    ReentranReadWriteLock的一个要点

    内部的Sync中有如下定义。

    c指的AbstractQueuedSynchronizer的state。SHARED_UNIT为65536。

            static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    ReadLock加锁

    读锁调用的AbstractQueuedSynchronizer中acquireShared,方法的实现在读锁内部,方法内部一段代码注释帮助大家理解

      public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
      protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    

    我们将焦点聚集在tryAcquireShared上。
    第一次加锁时state为0,exclusiveCount和sharedCount(c)也是0,经过compareAndSetState(c, c + SHARED_UNIT),c变为65536
    如果相同线程再次加读锁,则c的值再次加65536,firstReaderHoldCount变为2
    假设此刻c为65536,不同线程尝试加读锁时,sharedCount(c)计算如下为例,将65536表示为32位二进制

    00000000000000010000000000000000

    无符号右移16位

    00000000000000000000000000000001

    对应int值1,表示当前读锁加锁数量。此时代码进入

        else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
    

    创建HoldCounter,并将count设置为1,线程号为当前线程的Id。HoldCounter可以理解为某线程加锁的次数。

    如果线程因为并发原因导致无法进入if块,比如在CAS c时失败,就进入fullTryAcquireShared,代码和上述大同小异,主要是进入 for (;;){}循环尝试不停加锁。

    如果因为别的线程加了写锁,则因为如下代码,加锁失败。

    if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
    

    纵观try的过程,两个返回值1代表加锁成功,-1代表加锁失败。加锁失败时就会执行doAcquireShared,将当前线程作为一个节点加入双向链表。

    方法的具体代码和之前的很相似。区别主要有两点:1.Node的类型为SHARED,2.setHeadAndPropagate替换了setHead,重点在于此。

    int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
    
    
    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    满足一系列的判断会调用doReleaseShared。原因是现在加的共享锁,所以需要唤醒后继需要加共享锁的节点去加锁。

    
    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    

    doReleaseShared做的事情就是如果当前head节点是SIGNAL,就唤醒后继节点,否则将状态设为PROPAGATE,如果head节点变化了,就结束。

    首先如果所有的加锁都是读锁,是不会有一个链表的,必然要么是有线程加了读锁后又有线程尝试加写锁,或者相反。
    如下图所示,N0为head,类型为共享锁,所以首先唤醒N0,unpark后循环继续,在判断h==head前,类型为独占的N1加入进来了,但还未更改状态,所以N1的状态随时有可能变化,从0变成了SIGNAL
    或者还没变化,但如果节点一直处于链表中没有唤醒,它的状态会变为SIGNAL

    unpark前一瞬间的列表

    
            +------+        +------+ 
            | N0(S)| <----  | N1(E)| 
            |SIGNAL| ---->  |DEFAULT|
            +------+        +------+ 
    
    

    unpark后一瞬间的列表,此后循环会尝试将DEFAULT值改为PROPAGATE

    
     +------+ 
     | N1(E) | 
     |DEFAULT|
     +------+ 
    
    

    此后又有类型为共享的N2加入进来,此刻N1的值可能为PROPAGATE或者SIGNAL

    
            +---------+        +------+ 
            |  N1(E)  | <----  |  N2(S)| 
            |PROPAGATE| ---->  |DEFAULT|
            +---------+        +------+ 
    
    

    独占锁在释放时判断只要状态不为0,就可以唤醒后继者,所以N1的状态为PROPAGATE也没有关系。

     public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    WriteLock加锁

    直接看下已经有线程对加锁的情况下获取写锁的流程。

     if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    
    protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
    

    如果已经有线程持有读/写锁, c为65536或者其倍数,假设为65536,那么经过exclusiveCount后,即65536&65535,结果为0。
    此时getExclusiveOwnerThread()返回null/持有写锁的线程实例,所以加锁失败,那么剩下逻辑就是上篇对AbstractQueuedSynchronizer acquire方法的分析。

    如果是同一线程写锁重入,则进行运算1&65535 结果为1,加上已经重入的次数只要不超过最大值就对c+1

    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
    

    如果加写锁失败,那么流程将进入AbstractQueuedSynchronizer acquireQueued中。相关内容及释放过程已经在前文中描述过了。

    相关文章

      网友评论

        本文标题:ReentranReadWriteLock源码浅析

        本文链接:https://www.haomeiwen.com/subject/ofsezctx.html