美文网首页
ReentrantReadWriteLock源码分析

ReentrantReadWriteLock源码分析

作者: Longer_JzL | 来源:发表于2022-06-28 17:35 被阅读0次

    为什么要有读写锁

    ReentrantReadWriteLock 适用于读多写少的场景,标识同一时间,可以有多个线程并发读,但是不可以多个线程并发写。由于互斥锁,如:ReentrantLock,上锁后,无论你是读操作还是写操作,它们之间都是互斥的来保证线程安全,但是这样做的话,效率比较低。

    使用示例

    public class ReentrantReadWriteLockTest {
    
        public static void main(String[] args) throws InterruptedException {
            ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
            ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
            ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
    
            Thread r1 = new Thread(() -> {
                // 上读锁
                readLock.lock();
                try {
                    System.out.println("r1 开始读啦!!");
                } finally {
                    // 释放读锁
                    readLock.unlock();
                }
            });
    
    
            Thread r2 = new Thread(() -> {
                // 上读锁
                readLock.lock();
                try {
                    System.out.println("r2 开始读啦!!");
                } finally {
                    readLock.unlock();
                }
            });
    
            Thread w1 = new Thread(() -> {
                // 上读锁
                writeLock.lock();
                try {
                    System.out.println("w1 开始写啦!!");
                } finally {
                    // 释放写锁
                    writeLock.unlock();
                }
            });
    
            r1.start();
            r2.start();
            w1.start();
        }
    }
    
    

    核心思想

    • 基于AQS实现,用state来标识是否获取到锁,获取不到锁,线程交由AQS同步队列进行管理
    • 用32位的state来同时表示读锁和写锁,低16位表示写锁,高16位表示读锁
    • 公平与非公平
    • 写写互斥,读写互斥,读读共享
    • 锁降级

    源码分析

    构造方法

    public class ReentrantReadWriteLock
            implements ReadWriteLock, java.io.Serializable {
        private static final long serialVersionUID = -6992448646407690164L;
        /** Inner class providing readlock */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** Inner class providing writelock */
        private final ReentrantReadWriteLock.WriteLock writerLock;
        /** Performs all synchronization mechanics */
        final Sync sync;
    
        /**
         * Creates a new {@code ReentrantReadWriteLock} with
         * default (nonfair) ordering properties.
         */
        public ReentrantReadWriteLock() {
            this(false);
        }
    
        /**
         * Creates a new {@code ReentrantReadWriteLock} with
         * the given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    }
    

    从构造方法上看,默认是使用非公平的方式获取锁

    获取写锁(互斥锁)

    AQS.aquire(int arg) 方法
         /**
         * 获取互斥锁的模版方法
         * 流程:调用子类实现的tryAcquire方法尝试获取锁,若获取锁成功,则终止,若获取不成功,则调用AQS的addWaiter方法,新增节点到AQS的双端同步队列里,
         * 然后调用acquireQueued方法再次进行判断获取锁资源,若还是获取不到锁,则将线程阻塞
         * @param arg
         */
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    ReentrantReadWriteLock. tryAcquire(int arg) 方法
            /**
             * 获取写锁(互斥锁)
             * @param acquires
             * @return true:获取锁成功;false:获取锁失败
             */
            protected final boolean tryAcquire(int acquires) {
                // 获取当前线程对象
                Thread current = Thread.currentThread();
                // 获取当前锁状态(AQS 的 state)
                int c = getState();
                // 获取写锁(互斥锁)数量
                int w = exclusiveCount(c);
                if (c != 0) { // 当前有锁状态(读锁或写锁,或两者都有)
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    /**
                     *  重要:
                     *  w==0 : 意味着目前没有线程持有写锁。则:(c != 0 && w == 0) 意味着当前有线程持有读锁,但是没有任何线程持有写锁
                     *  current != getExclusiveOwnerThread(): 意味着当前线程没有持有写锁。
                     *  所以:return false 有两种情况:1、当前有线程持有读锁(理论:不能够进行锁升级,读写互斥)   2、目前有其他线程持有写锁(理论:写写互斥)
                     *  这两种情况,满足其一,都会返回false --> 当前线程进去AQS队列进行等待调度
                     */
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    // 判断持写锁数量是否超过最大值,超过则报错
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    /**
                     * 走到这里意味着:(w != 0 && current == getExclusiveOwnerThread() && (w + exclusiveCount(acquires) < MAX_COUNT))= true
                     * 即:当前线程本身就持有了写锁,并且写锁数量小于最大值。这里相当于写锁的重入(锁重入)
                     */
                    setState(c + acquires);
                    return true;
                }
                /**
                 * 走到这里意味着:c==0 即 当前属于无锁状态。
                 * writerShouldBlock():判断是否需要阻塞当前线程,分公平和非公平两种情况。
                 * 公平:看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:则writerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
                 *      总结:公平的获取写锁的机制为:查看前面是否有线程持有锁(无论是读锁还是写锁)或有线程正在等待锁,若有,直接进入队列等待
                 * 非公平:writerShouldBlock()永远返回的是false,即不需要看前面是否有节点正在阻塞等待唤醒,直接上来就cas抢锁
                 *      总结:非公平的获取写锁的机制为:
                 *
                 */
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires)) // CAS抢锁
                    return false;
                // 走到这里意味着:线程抢锁(写锁)成功,将exclusiveOwnerThread属性设置为自己,标识我已经获得了写锁,你们其他线程进队列等待去吧
                setExclusiveOwnerThread(current);
                return true;
            }
    
    ReentrantReadWriteLock. addWaiter() 方法
        /**
         * 向队列添加新的节点,从尾巴插入
         * @param mode 新增的节点对象
         * @return 返回新的尾节点
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    /**
         * 向队列添加新的节点,从尾巴插入
         * @param node 新增的节点对象
         * @return 返回新的尾节点
         */
        private Node enq(final Node node) {
            for (;;) { // 自旋,向队列尾部新增节点,直到成功为止
                Node t = tail;
                if (t == null) { // Must initialize
                    /**
                     * 若当前的尾节点为空,说明现在还没有任何节点插入到队列内,此时我们对头节点和尾节点进行初始化new Node()。
                     * 因此得出一个结论:头节点在后续的逻辑中,永远不可能为空
                     */
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) { // 将当前节点设置为新的尾节点
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    我们可以看到,在调用enq()方法前,程序会先进行cas做节点插入逻辑,但是enq()方法也有同样的插入逻辑,这样做会不会感觉逻辑重复了,是不是多余了?
    实际上这是一个优化,大部分情况下是不用走到enq方法里去自旋插入节点的,为了提高性能,避免进入for循环,这里将插入操作提前了。

    AQS.acquireQueued(final Node node, int arg) 方法
    /**
         * 获取锁资源,获取不到则对线程进行阻塞
         * @param node 已经在队列里的节点
         * @param arg
         * @return
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 是否被中断标识
                boolean interrupted = false;
                for (;;) { // 自旋
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    /**
                     *  若前驱节点为头节点,那么当前线程此时立刻尝试获取锁
                     *
                     *  问:为什么要在这里获取锁?
                     *  答:因为前驱节点如果是头节点的话,会存在两种情况:
                     *      1. 该头结点是初始化的虚节点,那么当前线程对应的node节点就应该是队列里的第一个"线程节点",那么此时应该直接尝试获取锁,并且将头结点更新为当前节点
                     *      2. 该头节点不是虚节点,是正常的"线程节点",那么此时很有可能头结点释放了锁,那么此时去尝试获取锁就有比较大的概率获取到,并且将头结点更新为当前节点
                     */
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 抢锁成功,return false
                        return interrupted;
                    }
                    // 判断线程是否可以阻塞,可以的话则进行阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
     /**
         * 判断当前线程是否可以"安心"的阻塞
         * @param pred
         * @param node
         * @return
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取当前线程的前驱节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                // 若前驱节点的状态为SIGNAL(-1),则直接返回true,表示可以安心的进行阻塞了
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                /**
                 * 若前驱节点的状态>0,即为CANCELLED(1)状态,则遍历往前寻找到一个CANCELLED状态节点为止,并将寻找到的节点与本节点进行关联。
                 * 关联完后,返回false,由上游程序自旋重新调用本方法进行判断
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                /**
                 * 若前驱节点的状态不为SIGNAL(-1),CANCELLED(1)。状态可能为:CONDITION(-2),PROPAGATE(-3)或 初始化(0)状态,
                 * 则将前驱节点的状态变成SIGNAL后,由上游程序自选重新调用本方法进行判断
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        * 阻塞当前线程
         * @return 被中断,则返回true,否则 返回false
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    AQS.cancelAcquire方法
        /**
         * 取消节点。
         * 流程:将当前节点Thread置为空-->获取到当前节点往前的第一个有效节点(非CANCELLED状态)-->将当前节点的状态置为CANCELLED(失效)-->将当前节点从队列内清除出去-->唤醒后继节点
         * 虽然在这个方法里,会将当前节点从队列内清除出去,但是在并发情况下,其他线程有可能获取到当前节点的状态为CANCEELED
         * @param node
         */
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            // 日常判空
            if (node == null)
                return;
    
            node.thread = null;
    
            // 获取前驱节点
            Node pred = node.prev;
            // 从后往前遍历获取到不为CANCELLED状态的前驱节点为止
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // 获取前驱节点的下一个节点
            Node predNext = pred.next;
    
            // 将当前线程节点的状态变为CANCELLED状态
            node.waitStatus = Node.CANCELLED;
    
            /**
             * 若当前线程节点为队列内的尾节点,则将上面找到的有效的前驱节点作为新的尾节点(CAS操作,将tail变量赋值为pred),成功后tail==pred
             */
            if (node == tail && compareAndSetTail(node, pred)) {
                /**
                 * 重点:
                 * 由于是双向练表,所以这里需要将新的尾节点(pred)的next节点cas为null。此时的状态为:pred->null pred<-node,当前节点的前驱指针还是指向pred的。
                 * 由于此时next-null,所以整条链路的next是不完整的,所以后续遍历查找节点应该从后往前利用pred指针找
                 */
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                /**
                 * 进到这里意味着:当前的节点不是尾节点 或 上面cas更新尾节点失败(并发引起)
                 */
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        // 将上面找到的前驱节点与当前线程的后继节点相连(CAS next指针)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    // 唤醒后继节点
                    unparkSuccessor(node);
                }
                // 当前节点的后继节点指向自己,方便GC
                node.next = node; // help GC
            }
        }
    
    

    释放写锁(互斥锁)

    AQS.release(int arg) 和unparkSuccessor(Node node)方法
        // 释放互斥锁
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    // 唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        // 唤醒后继节点
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            // 当前释放锁的节点的状态
            int ws = node.waitStatus;
            if (ws < 0)
                // 若当前节点状态不是CANCELLED状态(即:是有效节点),则直接cas,将当前节点状态修改为0(初始化状态)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            // 获取后继节点
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                /**
                 * 若当前节点的后继节点为空 或者 后继节点的状态为CANCELLED状态,则从后往前找到第一个状态不为CANCELLED的节点
                 * 问:为什么要从后往前找?
                 * 答:这是由于在取消节点的cancelAcquire()方法里,我们取消节点时,会有短暂的时刻导致next指针不完整,但是pre指针对整条链路来说是完整的,所以需要从后往前找。详细看cancelAcquire方法
                 */
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                // 唤醒上面找到的当前线程的后继有效节点
                LockSupport.unpark(s.thread);
        }
    
    
    

    获取读锁(共享锁)

    AQS.acquireShared(int arg) 方法
        /**
         * 获取共享锁,若获取不成功,则将线程放到队列里面去等待。
         * 流程:调用子类的tryAcquireShared方法尝试获取锁,若获取锁不成功,则调用doAcquireShared方法,入队等待
         * @param arg
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    ReentrantReadWriteLock.tryAcquireShared(int arg) 方法
            /**
             * 获取读锁(共享锁)
             * @param unused
             * @return
             */
            protected final int tryAcquireShared(int unused) {
                // 获取当前线程对象
                Thread current = Thread.currentThread();
                // 获取当前锁状态
                int c = getState();
                /**
                 * exclusiveCount(c) != 0 意味着 当前有线程持有写锁(互斥锁)
                 * getExclusiveOwnerThread() != current 意味着 当前持有写锁(互斥锁)的线程不是本线程
                 */
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current) // 这个判断意味着,同个线程,获取完写锁后,也可以直接获取读锁,即同个线程同时可以持有写锁和读锁(支持锁降级)
                    // 走到这里表示:当前有其他线程持有写锁,那么本线程获取读锁失败,直接返回-1,交由AQS将本线程放入队列内等待唤醒调度
                    return -1;
                // 获取读锁数量
                int r = sharedCount(c);
                /**
                 * readerShouldBlock() 判断是否需要阻塞当前线程,分公平和非公平两种情况。
                 * 公平: 看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:readerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
                 * 非公平:看我前面是否有线程正在等待着获取写锁(即互斥节点),若有,则readerShouldBlock() == true。若无:则为 false,表示不需要阻塞,可以直接进行cas抢锁
                 * 问:非公平方式获取读锁,为什么需要判断队列里面是否有互斥节点(等待获取写锁的线程)?
                 * 答:防止写线程饥饿。如果获取读锁不需要判断队列内是否有线程在等待获取写锁的话,那么如果有大量的线程在争抢读锁的情况的话,
                 * 写线程将会很久拿不到写锁,将会长久阻塞,不利于写线程的执行。这就是线程饥饿,所以才需要在获取读线程先判断是否有线程正在等待获取写线程。
                 * 若有,则读线程也需要进入队列排队等待,若没有,则读线程直接上去CAS抢锁
                 */
                if (!readerShouldBlock() &&
                    r < MAX_COUNT && // 判断读锁数量是否小于最大值
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    // 走到这里意味着:目前队列中没有等待的线程(公平) 或 没有等待获取写锁的线程(非公平),并且读锁数量在最大值范围内,且cas获取读锁成功
                    if (r == 0) { // 读锁数量为0,即:我是第一个获取到读锁的线程
                        /**
                         * 将当前线程标识为第一个获取读锁的线程。思考:为什么要这样?有什么好处?
                         * 答:提升性能,因为维护ThreadLocal成本比较高
                         */
                        firstReader = current;
                        // 第一个获取读锁的线程对应的锁重入次数
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        // 进到这里来意味着:程序允许到现在,只有一个线程在一直持有锁,就是当前线程,所以将读锁重入次数++
                        firstReaderHoldCount++;
                    } else {
                        /**
                         * 进到这里意味着:已经有其他线程持有读锁了,那么我需要在ThreadLocal里维护自己的读锁重入次数
                         * cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数
                         */
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                        /**
                         * 进到这里意味着:
                         * 情况1:cachedHoldCounter == null ,那么此时需要进行更新,更新最后获取锁的线程引用
                         * 情况2:当前标识的最后获取锁的线程对象不是自己本身,那么也需要对其进行更新
                         */
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                        /**
                         * 进到这里意味着:当前标识的最后获取锁的线程是自己本身,并且锁数量被释放完了(rh.count == 0)
                         * 问:为什么需要readHolds.set(rh)?
                         * 答:怎么样才能让程序进入到这里,就是最后一个线程在获取锁后,又将锁给释放掉,然后又重新获取锁。因为释放锁的同时,
                         * 会将count--,当减到0时,会将当前线程在ThreadLocal里的引用给remove掉,所以才需要这里的set操作。
                         * new Thread(()->{
                         *             readLock.lock();
                         *             readLock.unlock();
                         *             readLock.lock();
                         *         }).start();
                         */
                            readHolds.set(rh);
                        rh.count++;
                    }
                    // 返回抢锁成功
                    return 1;
                }
                /**
                 * 走到这里意味着:
                 * 1、readerShouldBlock() == true 即:判断到当前线程需要进行阻塞。
                 *    公平:表明前面有线程等待获取锁(无论是读锁还是写锁)。
                 *    非公平:表明前面有写线程正在等待获取写锁(队列内有互斥节点)
                 * 2、r < MAX_COUNT 读锁数量超过最大限制
                 * 3、CAS抢锁失败
                 */
                return fullTryAcquireShared(current);
            }
    
    
            /**
             * 一个完整的获取读锁(共享锁)的方法。当调用tryAcquireShared()获取锁没成功,就会调用此方法来获取读锁。
             * 这里解释一下:tryAcquireShared()方法上面讲了获取锁的流程,其实是作者为了性能做的一个优化,一般情况下是不用走到fullTryAcquireShared()这个方法的,
             * 除非在tryAcquireShared()方法中获取锁不成功就会走到这个方法来死循环重试拿锁。
             */
            final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                // 自旋抢锁
                for (;;) {
                    // 获取当前锁状态
                    int c = getState();
                    // 判断当前有没有线程持有写锁(互斥锁),0表示没有,>0表示有
                    if (exclusiveCount(c) != 0) {
                        // 判断当前持有写锁的线程是不是当前线程本身,若不是,则return -1 ,交由AQS管理,让本线程进入队列等待
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) { // 判断当前读线程是否需要被阻塞。
                        // Make sure we're not acquiring read lock reentrantly
                        /**
                         * 进到这里意味着:
                         * 1、公平:判断到队列内有线程正在等待获取锁
                         * 2、非公平:判断到队列内有写线程在等待获取锁
                         */
                        if (firstReader == current) {
                            // 若当前线程是已经获取过读锁的第一个线程,那么啥这里啥也不做,交由最下面的代码进行CAS操作
                            // assert firstReaderHoldCount > 0;
                        } else {
                            // 判断rh==null,为什么需要判断是否为空,是因为有可能同一个线程,在下面CAS抢锁失败后,重新循环抢锁,这时,rh就可能不为空
                            if (rh == null) {
                                // cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数。也是一种优化手段,避免维护ThreadLocal。
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                            /**
                             * 走到这里意味着:线程在下面CAS失败后,重新循环从头开始走逻辑,但是此时,当前线程的锁已经被释放了。
                             * 此时直接返回-1,让当前线程进入队列等待
                             */
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    /**
                     * 走到这里意味着:
                     * 情况1:当前持有互斥锁的线程是当前线程本身
                     * 情况2:当前线程判断到不需要阻塞(公平:前面没有任何线程在等待获取锁,那么直接CAS获取锁。非公平:前面没有任何线程在等待获取锁 或 前面没有写线程在队列里等待)
                     * 情况3:当前线程是firstReader线程
                     */
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        // 返回抢锁成功
                        return 1;
                    }
                }
            }
    
    
    
    AQS.doAcquireShared(int arg) 方法
        /**
         * 获取共享锁,不成功则将线程进行阻塞
         * @param arg
         */
        private void doAcquireShared(int arg) {
            // 向队列新增一个共享节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {// 自旋
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 若前驱节点为头节点,则尝试获取锁
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 若获取锁成功,则设置新的头节点,并结束程序
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // 若获取锁失败,或前驱节点不是头节点,阻塞当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    // 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
                    cancelAcquire(node);
            }
        }
    
    

    释放共享锁

    AQS.releaseShared(int arg)
        /**
         * 流程:调用子类tryReleaseShared方法尝试释放锁,释放锁成功后,则调用doReleaseShared方法,唤醒下一个节点进行抢锁
         * @param arg
         * @return
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    
    ReentrantReadWriteLock.tryReleaseShared(int unused)
           /**
             * 释放共享锁(读锁)
             * @param unused
             * @return
             */
            protected final boolean tryReleaseShared(int unused) {
                // 获取当前线程对象
                Thread current = Thread.currentThread();
                // 判断当前线程是否是firstReader线程
                if (firstReader == current) {
                    if (firstReaderHoldCount == 1)
                        // 若本线程的读锁重入次数此时判断已经是为1了,那么本次释放就会将这个线程的读锁全部释放完,那么也需要释放本线程,所以将firstReader引用指向空
                        firstReader = null;
                    else
                        // 读锁重入次数--
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                    /**
                     * 进到这里意味着:
                     * 情况1:cachedHoldCounter == null ,注意,这里跟获取共享锁的逻辑不同,这里是释放锁,所以不用更新cachedHoldCounter引用
                     * 情况2:当前线程不是cachedHoldCounter线程(不是最后获取锁的线程),那么当前线程的持有锁的重入次数情况需从readHolds(ThreadLocal)里面获取
                     */
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        // 锁重入次数<=1,那么本次释放后,需要将本个线程也释放掉,那么readHolds里面也不需要再维护本线程的持锁重入情况了,所以需要remove掉
                        readHolds.remove();
                        if (count <= 0) // 释放锁太多次,导致益处,则抛异常
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
    
                // 自旋 CAS释放锁,即CAS减少state值,直到CAS成功为止
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        // 释放锁成功,判断到锁重入次数为
                        return nextc == 0;
                }
            }
    
    AQS.doReleaseShared()方法
        /**
         * 释放共享锁,唤醒后续等待的线程节点
         */
        private void doReleaseShared() {
            for (;;) {
                // 获取当前的头节点
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) { // 若节点状态为SIGNAL(-1),则cas将状态置为初始化状态(0)
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // 上面cas成功,则唤醒下一个节点
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    锁降级

    锁降级:表示同一个线程,获取了写锁后,在释放写锁前,又获取了读锁,我们称这个过程叫做锁降级。通过对tryAcquireShared源码的分析,ReentrantReadWriteLock是允许锁降级的。

    小试牛刀

    • 问题1:ReentrantReadWriteLock是如何解决线程饥饿的?
    • 问题2:firstReader和firstReaderHoldCount的作用是什么?
    • 问题3:所有读线程维护自身ThreadLocalHoldCounter,将所有的ThreadLocalHoldCounter里的count相加,是否与state的高16位的值一致?
    • 问题4:什么是锁降级?
    • 问题5:AQS的同步队列为什么是双端队列?

    这里只对问题5进行一个回答,其他的问题通过上面的源码分析,都可以找到答案。

    AQS的同步队列为什么是双端队列?
    主要原因是:使用双端队列,可以提高性能,避免节点多线程操作冲突。一般情况下,会从头将节点摘除,然后从尾添加节点,这样可以减少节点的并发操作冲突。这个思想在ForkJoinPool里也是可以看到的。

    相关文章

      网友评论

          本文标题:ReentrantReadWriteLock源码分析

          本文链接:https://www.haomeiwen.com/subject/uyoavrtx.html