美文网首页
ReentrantLock condition 源码分析

ReentrantLock condition 源码分析

作者: 想起个帅气的头像 | 来源:发表于2020-11-27 22:53 被阅读0次

    本篇主要介绍ReentrantLock 中 condition的await/signal方法的实现原理。

    想忽略整个分析过程可以直接跳到结尾看总结。

    使用说明

    public void foo() throws InterruptedException {
            ReentrantLock reentrantLock = new ReentrantLock();
            Condition condition = reentrantLock.newCondition();
    
            reentrantLock.lock();
                condition.await();
                //....
                condition.signal();
            reentrantLock.unlock();
        }
    

    当前线程在获取到锁后,通过await来让自己进入park阻塞状态、加入等待队列,并释放锁。
    signal方法将其他在等待队列中,处于park状态下的线程唤醒,并尝试竞争锁。

    源码分析

    await() #1

    /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 第一部分
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 第二部分
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    await方法的代码比较多,可以拆分成两部分。第一部分如何让当前线程park。第二部分是线程被unpark后的实现。

    第一部分:

    addConditionWaiter()

            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    
            /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    主要目的是将线程构建成Conditon模式下的Node,加入到队列中。
    首先,队列为空,firstWaiter和lastWaiter都为null。当第一个node创建成功后,firstWaiter和lastWaiter都指向这个node。后续再来节点,则让node.next 指向新节点,lastWaiter也指向新节点。如此构建一个带有头尾指针的单向链表。

    再看方法里第二行的if判断,因为进入到condition队列的node一定都是condition(-2)状态,如果不是,则说明当前node所属线程已经处理了其他的逻辑。一般是cancel状态。此时要从链表中去掉cancel态的节点。

    /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    unlinkCancelledWaiters

    /**
             * Unlinks cancelled waiter nodes from condition queue.
             * Called only while holding lock. This is called when
             * cancellation occurred during condition wait, and upon
             * insertion of a new waiter when lastWaiter is seen to have
             * been cancelled. This method is needed to avoid garbage
             * retention in the absence of signals. So even though it may
             * require a full traversal, it comes into play only when
             * timeouts or cancellations occur in the absence of
             * signals. It traverses all nodes rather than stopping at a
             * particular target to unlink all pointers to garbage nodes
             * without requiring many re-traversals during cancellation
             * storms.
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    

    总而言之做了一件事,将非condition状态的node从链表中去掉。此时lastWaiter一定是condition状态,赋值给t。

    fullyRelease()

    /**
         * Invokes release with current state value; returns saved state.
         * Cancels node and throws exception on failure.
         * @param node the condition node for this wait
         * @return previous sync state
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    
    /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    savedState表示的是重入的次数,可能1次,也可能多次,这里一次性全部释放掉,将全局的state=0,exclusiveOwnerThread=null。并且通过unparkSuccessor获取同步队列中的下一个node。具体过程已经在ReentrantLock源码分析中做了说明。
    简而言之就是当前线程释放锁,让同步队列的下一个node开始抢占。

    isOnSyncQueue()

    /**
         * Returns true if a node, always one that was initially placed on
         * a condition queue, is now waiting to reacquire on sync queue.
         * @param node the node
         * @return true if is reacquiring
         */
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
            return findNodeFromTail(node);
        }
    
        /**
         * Returns true if node is on sync queue by searching backwards from tail.
         * Called only when needed by isOnSyncQueue.
         * @return true if present
         */
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    

    此时判断node是不是已经在sync队列中,判断的标准是waitStatus、prev和next,以及从tail倒序查找。
    这里关于倒序查询有很大一段注释,大意是说单纯判断node.prev是not null,并不能代表在node已经在sync队列中。需要从sync队列中的tail倒序查询,并且说明了node大概率在tail附近,不会有太多性能损耗。

    cas在替换prev时可能失败,也就是我下面贴的入队的方法实现。因为prev是volatile的,会直接可见,但是compareAndSetTail可能会失败,从而导致没有成功入队。


    如果node并没有在sync队列中,则被park。

          while (!isOnSyncQueue(node)) {
              LockSupport.park(this);
              if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                  break;
          }
    

    至此第一部分说明完成,当前占有锁的线程被添加到了condition queue中,释放锁被处于park状态。

    第二部分:

    既然线程已经被park了,就先说明是如何被unpark的。一般来说我们都是配置signal(signalAll)一起使用。先分析下signal().

    signal()

    /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    如果condition队列里有node,则开始唤醒。

    doSignal()

     /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    

    如果first.nextWaiter是null,则说明condition队列中只有这个node,firstWaiter、lastWaiter、nextWaiter都是null。
    如果后续还有节点,将nextWaiter指向firstWaiter,并断开first.nextWaiter。
    重点看下transferForSignal。

    transferForSignal()

    /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    

    顾名思义,这个方法的目的就是将node从condition队列转到sync队列。
    转移前的状态如果不是condition,说明是cancel,就不再执行。成功则继续向后执行,此时当前node的waitState=0
    将node节点enq到sync队列中,返回前一个node。
    如果前一个node已经被取消,或者在cas成signal的过程中失败(也就是可能在设置过程中cancel),那就通过unpark将当前节点唤醒(相当于被提前唤醒)。

    此时,当前线程完成signal方法的调用,如果调用了unpark,则这个线程也被唤醒。两个线程同时在执行。

    doSignalAll()

    /**
             * Removes and transfers all nodes.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    这个all表示把condition队列中的所有node全部transfer到sync队列。

    至此,signal(signalAll)执行完成,transfer或者unpark condition队列中的node。

    await() #2

    无论以什么样的方式唤醒,await内的park线程终究还是会被唤醒,继续向后执行。

          public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 第一部分
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 第二部分
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    先检查在等待过程中是否中断过,如果是,看中断时机。
    在signal信号前被中断返回THROW_IE,已经在sync队列中返回REINTERRUPT。

    /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
          /**
         * Transfers node, if necessary, to sync queue after a cancelled wait.
         * Returns true if thread was cancelled before being signalled.
         *
         * @param node the node
         * @return true if cancelled before the node was signalled
         */
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //如果收到了中断信号,且当前node还在condition队列中,则入队到sync队列。
                enq(node);
                return true;
            }
            /*
             * If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just
             * spin.
             */
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    acquireQueued()

     /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    中断标记记录后,尝试获取锁,如果没有达到条件,则再次进入park状态。

                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
    

    再次被唤醒或者抢占到锁后,清理一波cancel的condition队列。根据不同的中断标记向上抛出异常或者返回中断标记。

    至此,await() 方法也执行完成。

    总结

    condition的各种await、signal的处理结合了lock和unlock的状态。内部的很多操作都是需要在获得锁的状态下执行。这也就是为什么await、signal需要写到lock和unlock块中。

    这四个方法需要整体看。

    重点说明

    ReentrantLock 内部分为了两个队列(sync和condition), 两种模式(EXCLUSIVE、SHARED),五种状态(SINGAL, CONDITION, CANCELLED, PROPAGATE, 0)

    sync 队列是带有头尾指针的双向链表,节点字段是

        private transient volatile Node head;
        private transient volatile Node tail;
        volatile Node prev;
        volatile Node next;
    

    condition队列是带有头尾指针的单链表,节点字段是

            private transient Node firstWaiter;
            private transient Node lastWaiter;
            Node nextWaiter;
    

    lock()方法本质是将未获得锁的node加入到sync队列
    unlock方法本质是将sync队列的node依次唤醒执行。
    await()方法是将node加入到condition队列中。
    signal()方法是将condition队列中的head node(signalAll是全部node)从condition转到sync队列。

    相关文章

      网友评论

          本文标题:ReentrantLock condition 源码分析

          本文链接:https://www.haomeiwen.com/subject/sdooiktx.html