美文网首页
Condition源码解读

Condition源码解读

作者: 7d972d5e05e8 | 来源:发表于2020-05-05 15:31 被阅读0次

    Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的这两个方法,需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁。只不过Object的锁是synchronized,而Condition是Lock锁。这里先声明两个概念:

    1. 等待队列:Condition上用来等待这个条件的队列,它是个单向队列
    2. 同步队列:就是AQS等待拿锁的CLH队列,它是个双向队列。
      下文涉及这两个概念,一定要注意区分。

    一、解读Condition的await方法

    当然是先上源码:

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    1.1 第一步是addConditionWaiter方法。

    这个方法是为了构造等待Condition条件的等待队列。这个队列是单向链表,有别于等待Lock锁的同步队列(CLH队列,它是双向队列)。 这里一定要区别好这两个队列,虽然队列的元素都是Node实现的,但是两者的用途完全不一样。好了,我们看下这个方法:

    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    其实这个方法很简单,看下最后的等待元素node是否已经取消了。如果取消了,那么就从队列里面清除。这里有个疑问:
    1. 为啥要判断最后一个node,而不是第一个,或第二个,第三个等等?难道最后一个元素,它特别香吗
    2. 为啥最后一个node取消了,unlinkCancelledWaiters要遍历所有等待队列,清除所有已取消元素?

    1.2 第二步是fullyRelease方法

    这个方法其实就是模拟Object.wait方法,用来释放锁的。如果重入过,要把它释放干净。所以看名字fullyRelease,起的挺好的很直观。源码我们也来看一下:

    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    fullyRelease是通过getState,然后全部release掉。

    1.3 第三步是isOnSyncQueue方法

    final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
            return findNodeFromTail(node);
        }
    

    这个方法,乍一看莫名其妙。刚刚上面不是说了,node是单向队列吗,怎么用node.prev来判空。WT?单向队列的prev不是必空吗。其实这里的Node节点,已经被偷换了。 这个方法的Node,如果是第一次调用,那么确实还是Condition等待队列上node,它是单向队列,且它的waitStqatus = -2。这个方法在第一行的判断逻辑一定是true的。但是第2次,第3次调用,这个Node就不是等待队列的Node的,而是同步队列的Node,它是双向的,它有prev和next指针。具体是怎么悄悄就被转换了,第二节的signal方法会解释清楚的。
    咱们继续看,循环调用isOnSyncQueue方法,是怎么回事?

          while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
    

    第一次调用,一定会进入进来(上面已经解释了,不在赘述)。这里的park会阻塞自己,等待其他线程的唤醒。现在假如其他线程唤醒了该线程,它首先检查自己在阻塞期间,是否被中断过。如果没有中断过,继续执行isOnSyncQueue方法。上面也说了,第二次调用的时候,这个Node元素就变成了同步队列元素。再回头先下isOnSyncQueue方法,node.waitStatus一定不再等于-2了,且node.prev必不为null。其实这里为啥要判断node.prev == null 我也还没搞懂。然后第二个判断条件,node.next != null,大概率不为空。
    总结下:这个方法主要作用就是用来判断该node节点,没有加入到同步队列。为啥要判断有没有加入到同步队列呢?因为await的时候,你先释放了锁,现在唤醒了,你不是要重新获取锁嘛?和Object的wait效果一致。学习了!

    1.4 第四步是acquireQueued方法

    这个方法,其实就是被唤醒后,用来获取锁。其和Lock的流程就一模一样了,可以看以前的文章,有讲过,不在赘述。有个细节acquireQueued(node, savedState)的savedState,是之前fullyRealese留下来的,这里要全部拿回。释放了我多少,我就要拿回来多少。拿锁那一套,走Lock流程。

    该方法后面又来一次unlinkCancelledWaiters,咋这么喜欢清理等待队列呢?

    二、解读Condition的signal方法

    上源码,不上源码我怎么讲。

    public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    里面就一个核心方法doSignal。

    private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    

    里面也是一个核心方法transferForSignal

     final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    这个方法的作用就是,把等待队列的Node元素,转换为同步队列的node元素。然后修改node的waitStatus 由 -2 变为0。
    compareAndSetWaitStatus(node, Node.CONDITION, 0) 这个动作就要要告诉node节点,你已经等到condition啦,你可以安全的去竞争锁了。然后就把node 执行enq方法。这个enq方法是不是很熟悉,就是把node加入到CLH锁的同步队列里面。所以这里就是转换的关键,也是上文提到的Node在第一次和非第一次执行时的不同。转换结束后,看下p节点(p是node的前置节点)。如果p节点已经取消了,或者p节点修改失败,就唤醒当前node节点的线程。搞不懂为啥要这么做,还没想明白?因为同步队列里面,可能还有有其他很多很多线程等待着锁呢,这里仅仅根据node的前置节点状态去判断,就要唤醒node线程,唤醒了又能怎么样呢?醒了后,发现自己不是head的next节点,还是要继续阻塞等待,还不如不唤醒。

    总结:Condition所有的操作,就是为了实现Object的wait和notify的功能,实现线程间更优雅的通信。

    吃饭去了。。。

    相关文章

      网友评论

          本文标题:Condition源码解读

          本文链接:https://www.haomeiwen.com/subject/otymghtx.html