美文网首页Java程序员
CountDownLatch 源码浅析

CountDownLatch 源码浅析

作者: tomas家的小拨浪鼓 | 来源:发表于2018-02-16 21:33 被阅读250次


CountDownLatch 介绍

CountDownLatch是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,在这之后(即,count为0之后)所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 ———— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

CountDownLatch是一个通用的同步工具,它能用于许多用途。一个使用’1’计数值初始化的CountDownLatch服务作为一个简单的开关门:所有执行await的线程等待在门口,直到某个执行countDown方法的线程将门打开。一个使用‘N(count)’初始化的CountDownLatch能被用于使一个线程等待,直到N个线程完成某些动作,或者某些动作已经完成N次。

CountDownLatch一个很有用的性质是,它不要求你在可以继续进行之前调用countDown方法等待count到达0,它只是简单的防止任何线程超过await方法直到所有的线程都可以通过。
也就是说,你可以在任意时刻调用await,如果当前的count值非0,那么线程会等待直到count为0时才会继续往下执行,否则如果count值为0,await方法会立即返回,你可以不被阻塞的继续往下执行。

内存一致性作用:直到count到达0,一个线程调用countDown()方法之前的动作 happen-before 从另一个线程相应的await()方法返回之后的动作。
比如,threadB.await()、threadA.countDown(),那么threadA执行countDown()之前的动作,对于threadB的await()方法之后的动作都可见(当count为0时,threadB会从await()方法的阻塞中结束而继续往下执行)。

AbstractQueuedSynchronizer

因为CountDownLatch是使用AbstractQueuedSynchronizer(AQS)的state来实现其同步控制的。CountDownLatch使用的是共享锁模式,由于AQS除了共享锁模式还有排他锁模式,本文仅对CountDownLatch涉及到的共享锁模式部分的内容进行介绍,关于排他锁模式的部分会在 ReentrantLock 源码浅析一文中介绍。

AQS提供一个框架用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和同步器(信号量,事件等)。这个类被设计与作为一个有用的基类,一个依赖单一原子值为代表状态的多种同步器的基类。子类必须将修改这个状态值的方法定义为受保护的方法,并且该方法会根据对象(即,AbstractQueuedSynchronizer子类)被获取和释放的方式来定义这个状态。根据这些,这个类的其他方法实现所有排队和阻塞的机制。子类能够维护其他的状态属性,但是只有使用『getState』方法、『setState』方法以及『compareAndSetState』方法来原子性的修改 int 状态值的操作才能遵循相关同步性。

等待队列节点类 ——— Node

等待队列是一个CLH锁队列的变体。CLH通常被用于自旋锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)。我们用它来代替阻塞同步器,但是使用相同的基本策略,该策略是持有一些关于一个线程在它前驱节点的控制信息。一个“status”字段在每个节点中用于保持追踪是否一个线程需要被阻塞。一个节点会得到通知当它的前驱节点被释放时。队列中的每一个节点都作为一个持有单一等待线程的特定通知风格的监视器。状态字段不会控制线程是否被授予锁等。一个线程可能尝试去获取锁如果它在队列的第一个。但是首先这并不保证成功,它只是给与了竞争的权力(也就是说,队列中第一个线程尝试获取锁时,并不保证一定能得到锁,它只是有竞争锁的权力而已)。所以当前被释放的竞争者线程可能需要重新等待获取锁。
(这里说的"队列中的第一个的线程"指的时,从队列头开始往下的节点中,第一个node.thread != null的线程。因为,AQS队列的head节点是一个虚节点,不是有个有效的等待节点,因此head节点的thread是为null的。)

为了排队进入一个CLH锁,你可以原子性的拼接节点到队列中作为一个新的队尾;对于出队,你只要设置头字段。(即,入队操作时新的节点会排在CLH锁队列的队尾,而出队操作就是将待出队的node设置为head。由此可见,在AQS中维护的这个等待队列,head是一个无效的节点。初始化时head是一个new Node()节点;在后期的操作中,需要出队的节点就会设置到head中。)

          +------+  prev +-----+       +-----+
     head |      | <---- |     | <---- |     |  tail
          +------+       +-----+       +-----+

插入到一个CLH队列的请求只是一个对“tail”的单个原子操作,所以有一个简单的从未入队到入队的原子分割点。类似的,出队调用只需要修改“head”。然而,节点需要更多的工作来确定他们的后继者是谁,部分是为了处理由于超时和中断而导致的可能的取消。
(也就是说,一个node的后继节点不一定就是node.next,因为队列中的节点可能因为超时或中断而取消了,而这些取消的节点此时还没被移除队列(也许正在移除队列的过程中),而一个node的后继节点指的是一个未被取消的有效节点,因此在下面的操作中你就会发现,在寻找后继节点时,寻找的都是当前节点后面第一个有效节点,即非取消节点。)

“prev”(前驱)连接(原始的CLH锁是不使用前驱连接的),主要用于处理取消。如果一个节点被取消了,它的后驱(通常)会重连接到一个未被取消的前驱。

另外我们使用“next”连接去实现阻塞机制。每个节点的线程ID被它们自己的节点所持有,所以前驱节点通知下一个节点可以被唤醒,这是通过遍历下一个链接(即,next字段)来确定需要唤醒的线程。后继节点的决定必须同‘新入队的节点在设置它的前驱节点的“next”属性操作(即,新入队节点为newNode,在newNode的前驱节点preNewNode进行preNewNode.next = newNode操作)’产生竞争。一个解决方法是必要的话当一个节点的后继看起来是空的时候,从原子更新“tail”向前检测。(或者换句话说,next链接是一个优化,所以我们通常不需要反向扫描。)

取消引入了对基本算法的一些保守性。当我们必须为其他节点的取消轮询时,我们不需要留意一个取消的节点是在我们节点的前面还是后面。它的处理方式是总是根据取消的节点唤醒其后继节点,允许它们去连接到一个新的前驱节点,除非我们能够标识一个未被取消的前驱节点来完成这个责任。

  • waitStatus
volatile int waitStatus;

状态属性,只有如下值:
① SIGNAL:
static final int SIGNAL = -1;
这个节点的后继(或者即将被阻塞)被阻塞(通过park阻塞)了,所以当前节点需要唤醒它的后继当它被释放或者取消时。为了避免竞争,获取方法必须首先表示他们需要一个通知信号,然后再原子性的尝试获取锁,如果失败,则阻塞。
也就是说,在获取锁的操作中,需要确保当前node的preNode的waitStatus状态值为’SIGNAL’,才可以被阻塞,当获取锁失败时。(『shouldParkAfterFailedAcquire』方法的用意就是这)
② CANCELLED:
static final int CANCELLED = 1;
这个节点由于超时或中断被取消了。节点不会离开(改变)这个状态。尤其,一个被取消的线程不再会被阻塞了。
③ CONDITION:
static final int CONDITION = -2;
这个节点当前在一个条件队列中。它将不会被用于当做一个同步队列的节点直到它被转移到同步队列中,转移的同时状态值(waitStatus)将会被设置为0。(这里使用这个值将不会做任何事情与该字段其他值对比,只是为了简化机制)。
④ PROPAGATE:
static final int PROPAGATE = -3;
一个releaseShared操作必须被广播给其他节点。(只有头节点的)该值会在doReleaseShared方法中被设置去确保持续的广播,即便其他操作的介入。
⑤ 0:不是上面的值的情况。
这个值使用数值排列以简化使用。非负的值表示该节点不需要信号(通知)。因此,大部分代码不需要去检查这个特殊的值,只是为了标识。
对于常规的节点该字段会被初始化为0,竞争节点该值为CONDITION。这个值使用CAS修改(或者可能的话,无竞争的volatile写)。

  • prev
volatile Node prev

连接到前驱节点,当前节点/线程依赖与这个节点waitStatus的检测。分配发生在入队时,并在出队时清空(为了GC)。并且,一个前驱的取消,我们将短路当发现一个未被取消的节点时,未被取消的节点总是存在因为头节点不能被取消:只有在获取锁操作成功的情况下一个节点才会成为头节点。一个被取消的线程绝不会获取成功,一个线程只能被它自己取消,不能被其他线程取消。

  • next
volatile Node next

连接到后继的节点,该节点是当前的节点/线程释放唤醒的节点。分配发生在入队时,在绕过取消的前驱节点时进行调整,并在出队列时清空(为了GC的缘故)。一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。被取消的节点的next字段会被设置为它自己,而不是一个null,这使得isOnSyncQueue方法更简单。

  • thread
volatile Thread thread

这个节点的入队线程。在构建时初始化,在使用完后清除。

  • nextWaiter
Node nextWaiter

链接下一个等待条件的节点,或者一个指定的SHARED值。因为只有持有排他锁时能访问条件队列,所以我们只需要一个简单的单链表来维持正在等待条件的节点。它们接下来会被转换到队列中以去重新获取锁。因为只有排他锁才有conditions,所以我们使用给一个特殊值保存的字段来表示共享模式。
也就是说,nextWaiter用于在排他锁模式下表示正在等待条件的下一个节点,因为只有排他锁模式有conditions;所以在共享锁模式下,我们使用’SHARED’这个特殊值来表示该字段。

源码分析

初始化
CountDownLatch doneSignal = new CountDownLatch(N);

CountDownLatch 使用了共享锁模式。CountDownLatch 使用了一个内部类 Sync来实现CountDownLatch的同步控制,而Sync是AQS的一个实现类,它使用AQS的状态(state)来表示count。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

构造一个CountDownLatch使用给定的count值进行初始化。
count值最终是设置到sync(AbstractQueuedSynchronizer)里的state字段。

阻塞的流程分析

『await()』

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

导致当前的线程等待直到latch被倒数到0,或者线程被中断了。
如果当前的count是0,那么方法会立即返回,并且返回值为true。
如果当前的count大于0,则当前线程因为线程调度而变得不可用,并且处于休眠状态,直到发生下面二件事之一:
① 由于countDown方法的调用当前的count达到0;
如果count达到0,那么这个方法将返回true。
② 其他线程中断了当前的线程;
如果当前线程在进入这个方法时设置了中断状态;或者当前线程在等待时被设置了中断状态,那么“InterruptedException”异常将会抛出,并且当前的线程的中断状态会被清除。


『acquireSharedInterruptibly』

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

获取一个共享模式锁,如果发生中断则异常终止。实现首先会检查中断的状态,然后执行至少一次的tryAcquireShared,成功的话返回。否则,线程将会入队,可能会重复的阻塞和解阻塞,执行tryAcquireShared直到成功或者线程被中断。
① 首先判断当前的线程是否被标志为了中断,如果被标志位了中断,则抛出“InterruptedException”异常,并清除中断标志;否则到第②步;
② 执行『tryAcquireShared』来尝试获取锁,如果成功(即,返回>=0)。则返回true退出方法;否则到第③步
③ 执行doAcquireSharedInterruptibly。


『doAcquireSharedInterruptibly』

① 创建一个共享模式的节点,并将这个节点加入到等待队列中。
② 获取新创建好节点的前驱节点。如果前驱节点是head节点,则说明当前节点是队列中第一个等待获取锁的节点,那么就执行『tryAcquireShared』方法尝试获取共享锁。tryAcquireShared是由CountDownLatch重写的方法。具体实现下面会详细说明。这里先给出结果就是tryAcquireShared方法的返回值会小于0.也就说获取共享锁失败。进入步骤③
③ 如果前驱节点不是head节点,或者当前节点获取共享锁失败(即,步骤②)。那么执行『shouldParkAfterFailedAcquire』方法,该方法返回true则说明本次获取共享锁失败需要阻塞(挂起)当前线程。接着执行『parkAndCheckInterrupt』方法,该方法会将当前线程挂起,直到被唤醒。
这就是阻塞情况下的一个主流程,可以知道的是,在这个逻辑过程中使用了大量的CAS来进行原子性的修改,当修改失败的时候,是会通过for(;;)来重新循环的,也就是说『doAcquireSharedInterruptibly』使用自旋锁(自旋+CAS)来保证在多线程并发的情况下,队列节点状态也是正确的以及在等待队列的正确性,最终使得当前节点要么获取共享锁成功,要么被挂起等待唤醒。


下面我们来对阻塞情况下,涉及的方法进行进一步的展开。
『addWaiter』
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

根据给定的模式创建当前线程的节点,并将创建好的节点入队(加入等待队列尾部)。
首先在队列非空的情况下会尝试一次快速入队,也就是通过尝试一次CAS操作入队,如果CAS操作失败,则调用enq方法进行“自旋+CAS”方法将创建好的节点加入队列尾。
在共享模式下,Node的mode(即,waitStatus)为’SHARED’。waitStatus是用于在排他锁模式下当节点处于条件队列时表示下一个等待条件的节点,所以在共享锁模式下,我们使用’SHARED’这个特殊值来表示该字段。


『enq』

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

使用自旋锁的方式(自旋+CAS)插入节点到等待队列,如果等待队列为空则初始化队列。
初始化队列:创建一个空节点(即,new Node()),将head和tail都指向这个节点。
然后才是将我们待插入的节点插入,即:emptyNode -> newNode. head指向emptyNode,tail指向newNode。


『tryAcquireShared』
在共享模式下尝试获取。这个方法需要查询是否对象的状态允许在共享模式下被获取,如果允许则去获取它。
这个方法总是被线程执行获取共享锁时被调用。如果这个方法报告失败,那么获取方法可能会使线程排队等待,如果它(即,线程)还没入队的话,直到其他的线程发出释放的信号。
默认实现抛出一个“UnsupportedOperationException”
返回:
a)< 0 : 一个负数的返回表示失败;
b) 0 : 0表示在共享模式下获取锁成功,但是后续的获取共享锁将不会成功
c)> 0 : 大于0表示共享模式下获取锁成功,并且后续的获取共享锁可能也会成功,在这种情况下后续等待的线程必须检查是否有效。

CountDownLatch对该方法进行了重写:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果当前的状态值为0(即,count为0),则表示获取成功(返回’1’);否则表示获取失败(返回’-1’)


『shouldParkAfterFailedAcquire』

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞。这是主要的信号通知控制在所有的获取锁循环中。要求’pred’ == ‘node.prev’
① 如果pred.waitStatus == Node.SIGNAL。则说明node的前驱节点已经被要求去通知释放它的后继节点,所以node可以安全的被挂起(park)。然后,退出方法,返回true。
② 如果pred.waitStatus > 0。则说明node的前驱节点被取消了。那么跳过这个前驱节点并重新标志一个有效的前驱节点(即,waitStatus <= 0 的节点可作为有效的前驱节点),然后,退出方法,返回false。
③ 其他情况下,即pred.waitStatus为’0’或’PROPAGATE’。表示我们需要一个通知信号(即,当前的node需要唤醒的通知),但是当前还不能挂起node。调用『compareAndSetWaitStatus(pred, ws, Node.SIGNAL)』方法通过CAS的方式来修改前驱节点的waitStatus为“SIGNAL”。退出方法,返回false。
我们需要一个通知信号,主要是因为当前线程要被挂起了(park)。而如果waitStatus已经是’SIGNAL’的话就无需修改,直接挂起就好,而如果waitStatus是’CANCELLED’的话,说明prev已经被取消了,是个无效节点了,那么无需修改这个无效节点的waitStatus,而是需要先找到一个有效的prev。因此,剩下的情况就只有当waitStatus为’0’和’PROPAGAET’了(注意,waitStatus为’CONDITION’是节点不在等待队列中,所以当下情况waitStatus不可能为’CONDITION’),这是我们需要将prev的waitStatus使用CAS的方式修改为’SIGNAL’,而且只有修改成功的情况下,当前的线程才能安全被挂起。
还值得注意的时,因此该方法的CAS操作都是没有自旋的,所以当它操作完CAS后都会返回false,在外层的方法中会使用自旋,当发现返回的是false时,会再次调用该方法,以检查保证有当前node有一个有效的prev,并且其waitStatus为’SIGNAL’,在此情况下当前的线程才会被挂起(park)。

释放的流程分析

『countDown』

public void countDown() {
    sync.releaseShared(1);
}

减小latch的count,如果count达到0则释放所有正在等待的线程。
如果当前的count大于0,那么减少count。如果减少后的count值为0,那么所有正在等待的线程因为线程调度的原因被重新启用。
如果当前的count值已经是0了,那么什么都不会发生。


『releaseShared』

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

共享模式下的释放。如果『tryReleaseShared』返回true的话,会使一个或多个线程重新启动。


『tryReleaseShared』
在共享模式下,尝试去设置状态来反映一个释放。
这个方法总是在线程执行释放时被调用。
默认实现抛出一个UnsupportedOperationException异常。
返回:如果当前共享模式可能允许一个正在等待的获取成功(正在等待的获取可能是共享模式的,也可能是排他模式的),则返回true;否则,返回false。

CountDownLatch对该方法进行了重写:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

减少count的值,如果count为0则发出释放信号。
这里使用了"自旋+CAS”的方式来原子性的将state的值减少1,如果在此过程中state已经为0了(在并发情况下,可能已经被其他线程修改为了0),则返回false。否则根据修改后state的值是否等于0来返回boolean值。


『doReleaseShared』

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

共享模式下的释放动作 ———— 用信号通知后继节点并且确保广播。(注意:在排他锁模式下,释放只是相当于调用head的unparkSuccessor方法如果它需要通知唤醒的话。)
确保一个释放的广播,即使有其他线程正在进行获取/释放锁。这个过程通常的方式是尝试head的unparkSuccessor操作如果需要通知释放的话。如果没这么做,状态会被设置为‘PROPAGATE’以确保在释放,广播继续。此外,当我们正在做这个操作的时候如果新的节点被添加的话,我们需要重新循环再进行一次该操作。另外,不同于unparkSuccessor的其他用途,我们需要知道CAS重置状态是否失败,如果失败则重新检查。

在队列非空的时候,该方法会释放head的后继节点,如果该节点可以被释放的话。『(h != null && h != tail)』表示队列非空,即有等待获取锁的节点;『(h == head)』表示,已经操作完释放后继节点,或者队列已经空了(即,『(h == null || h == tail)』),那么就退出循环。否则如果循环过程中(即,『h != head』),头结点发生了变化,则重新循环。
如果『if (h != null && h != tail)』为true,那么:
① 如果head的waitStatus为’SIGNAL’,则说明head的后继节点可被通知释放,那么执行CAS操作将head.waitStatus修改为’0’,如果成功,则执行『unparkSuccessor』对head的后继节点进行释放操作,如果CAS操作失败,则说明发送了多线程竞争(即,此时有其他线程也在修改head的waitStatus状态值),那么重新循环检查。
② 如果head的waitStatus为’0’,则使用CAS的方式将其修改为’PROPAGATE’。如果CAS操作失败,则说明发生了多线程竞争,那么重新循环检查。
③ 如果上面的两个操作中有一个成功了,就会走到“if (h == head)”这一步,并且此时head节点没有发生变化,则退出循环,操作结束。否则,说明head节点发生变化了,那么重新循环检查。
『if (h != null && h != tail)』为false,那么:
说明队列中没有等到获取锁的节点。会直接到“if (h == head)”,如果此时head节点没有发生变化,则直接退出循环,操作结束。如果此时head节点发生了变化,那么重新循环检查。
也就是说,该方法在等待队列非空时(即,存在一个有效的等待节点,头结点不是有效节点),会根据head的waitStatus进行后续的操作。
a) 如果『ws == Node.SIGNAL』,则说明需要释放head后继节点,如果此时CAS操作『compareAndSetWaitStatus(h, Node.SIGNAL, 0)』也成功的话(说明,此时没有其他线程在修改head的waitStatus),那么就会执行『unparkSuccessor(h);』来释放head的后继节点。
b) 如果『ws != Node.SIGNAL』并且『ws == 0』,则通过CAS操作将head的waitStatus修改为’PROPAGATE’。
以上两步,当CAS失败,也就是有其他线程也在修改head的waitStatus状态时,需要继续循环进行重新检测,如果head节点改变了也需要继续循环重新检测。

Q:关于node的waitStatus为’0’的情况?
A:当节点不属于任何waitStatus的话,就会是0。比如,创建好的节点。比如,原来是SIGNAL状态,在执行完unparkSuccessor操作后(逻辑上说是执行完unparkSuccessor后,但实际的代码实现必须先将node的waitStatus通过CAS成功从SINGAL修改为0后,才可执行unparkSuccessor操作,以保证多线程竞争情况下的正确性)。比如,将节点从条件队列转移到等待队列的时候,会通过CAS将node的waitStatus从’CONDITION’修改为0。

Q:’PROPAGATE’状态与释放之间的关系?
A:当head的waitStatus为’PROPAGATE’的话,在释放操作时,这个释放会被广播下去,也就是说,第一个线程被释放完后,会继续释放第二个被阻塞的线程。。。


『unparkSuccessor』

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

唤醒后继节点,如果存在的话
① 如果状态值是负数,则在预期发信号通知时清除这个负数状态值。如果状态被等待的线程修改了或者清除负数状态值失败是允许。
② 后继节点的线程被唤醒,后继节点通常就是下一个节点。但是如果下一个节点被取消了或者下一个节点为null,则从队列尾(tail)往前遍历去找真实的未取消的后继节点。
『(s == null || s.waitStatus > 0)』:说明下一个节点为null或被取消了(waitStatus允许的状态值中,只有’CANCELLED’是>0的)。那么,就从队列尾(tail)开始向前遍历,获取第一个非空且未被取消的节点。如果存在这样的一个后继节点的话(即,“s != null”),则执行『LockSupport.unpark(s.thread);』操作来唤醒这个节点的线程。
Q:关于node的waitStatus为’CANCELLED’的情况?
A:关于node的waitStatus为’CANCELLED’的情况:比如,当这个node被中断了,或者设置的超时时间到了,那么说明这个线程获取锁失败,那么此时就应该将其设置为cancelled,因为如果该线程还需要获取锁的话,会重新调用获取锁的方法,而获取锁的方法就是创建一个新的node的。所以,那么线程获取锁失败的时候就会将这个node的waitStatus设置为’CANCELLED’,一个被取消的线程绝不会获取锁成功,一个线程只能被它自己取消,不能被其他线程取消。

Q:关于node为null的情况?
A:关于node为null的情况:比如,一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。


当调用了『LockSupport.unpark(s.thread);』操作后,等待队列中第一个等待的线程就会重新启动。流程回到『doAcquireSharedInterruptibly』方法中,线程从阻塞中恢复:

第一个被释放的线程从『parkAndCheckInterrupt』方法中的『LockSupport.park(this)』挂起结束,继续后面的流程。因为此时是正常的被唤醒流程,线程并没有被设置中断标志,因此『parkAndCheckInterrupt』会返回false。流程重新开始循环。并且通过『Node p = node.predecessor()』为head,接着执行『tryAcquireShared』方法,此时的count==0,所以该方法也会返回’1’,表示获取共享锁成功。接着通过『setHeadAndPropagate』将当前节点设置为头节点并进行广播如果需要的话。然后将p(即,旧的head节点)的next置null,有助于p被垃圾收集器收集。然后标识failed为false。结束方法调用,返回true。


『setHeadAndPropagate』
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

设置’node’节点为头结点,并且检查’node’节点的后继是否正在等待获取共享锁,如果是的话,并且'propagate > 0'或者’node’的waitStatus被设置成了’PROPAGATE’,则广播。
① 设置’node’为head节点
② 尝试通知队列中的下一个节点,如果:
  [1]
   a) 调用者标识了广播(即,propagate > 0),
   b) 或者waitStatus被前面的操作重新记录了(’h.waitStatus’可能在setHead之前或之后被重新记录)(注意,这里使用waitStatus的符号检查,因为PROPAGATE状态可能被转换为SIGNAL)。
  并且[2]队列中下一个等待的节点是共享模式的,或者下一个节点为null。
这两次检查的保守性可能导致不必要的唤醒,但是只有当多线程竞争获取/释放锁时,所以大多数情况下现在或即将需要通知(signal)唤醒。(因为在enq新节点入队过程中,可能出现next为null的短暂现象,这是发现在节点入队的过程中,随后节点就会入队成功,next字段就不会为null了。所以这里将next为null的情况也考虑了,在广播释放时,会将这个正在入队的节点对应的线程也进行释放)。
如果符合👆[1]、[2]个条件则执行『doReleaseShared()』来释放后继的节点。

可设置超时时间的await

『await(long timeout, TimeUnit unit)』同『await()』方法大体是相同的,主要多了在获取共享锁时对时间的控制。
在尝试获取锁时的区别:
① 如果传入的给定的超时纳秒数是否小于等于0,如果是则直接返回false,获取共享锁失败。
② 如果在使用自旋的方式获取共享锁的过程中,发现已经过了设置的超时时间,那么直接返回false,获取共享锁失败。
③ 如果当前线程无法获取当共享锁,并且『shouldParkAfterFailedAcquire』方法返回true(则说明本次获取共享锁失败需要阻塞/挂起当前线程)。但当『nanosTimeout <= spinForTimeoutThreshold』说明设置的超时时间 <= 自旋超时的阈值。这里spinForTimeoutThreshold的值为1000纳秒,表示当设置的超时时间小于1000纳秒时,使用自旋比使用线程挂起更快。粗略估算这足以去提升响应在一个很短的超时时间内。否则也是使用『LockSupport.parkNanos(this, nanosTimeout);』将当前线程挂起,直到被唤醒或者超时时间到。

取消节点

当尝试获取锁的节点,因为超时或中断而结束时,说明本次获取锁操作失败,因为本次操作的node就应该被取消。如果线程还需要获取锁的话,会再次尝试获取锁操作,此时如果需要的话是会生成一个新的node的。
『cancelAcquire』

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

① 如果待取消节点(node)为null,则直接返回。
② 将node的thread置为null;
③ 将node的prev属性指向一个在它之前的有效的节点(即,waitStatus <= 0的节点都为有效节点)。 也就是跳过被取消的前驱节点。
④ 『Node predNext = pred.next;』取pred的下一个节点。这个predNext是pred表面上的下一个连接的节点(即,无需考虑该节点是否被取消了)。下面的CAS操作将会失败(『compareAndSetNext(pred, predNext, null);』or『compareAndSetNext(pred, predNext, next);』),如果和其他的取消或通知操作发生竞争时,这时不需要进一步的操作。因为如果产生竞争,说明pred的next已经被修改了,并且是最新的值了,而我们的操作也就没有要执行的必要了。
⑤ 将node的waitStatus设置为’CANCELLED’。这里可以使用无条件的写代替CAS(注意,node的waitStatus是volatile的)。在这个原子操作之后,其他节点会跳过我们(即,跳过waitStatus被置位CANCELLED的节点),在这个原子操作之前,我们不受其他线程的干扰。也就是说,无论其他线程对node的waitStatus是否有在操作,在当前的情况下我们都需要将这个node的waitStatus置为’CANCELLED’。
⑥ 如果待取消的node节点是队列尾节点的话(即,『node == tail』),那么删除node自己即可。使用CAS将tail节点设置成前面得到的第一个有效前驱节点(即,『compareAndSetTail(node, pred)』)。并且CAS操作成功的话,执行『compareAndSetNext(pred, predNext, null);』也就是将tail的next置为null的意思。如果该CAS操作失败的话,没关系。说明此时tail已经被修改了。
⑦ 如果待取消的node节点不是队尾节点。并且:
a)pred(即,node的有效前驱节点)不是head节点;并且
b)“pred.waitStatus为SIGNAL” 或者 “pred.waitStatus <= 0”时通过CAS将pred.waitStatus设置为SIGNAL”成功;并且
c) pred的thread非空
那么,当node的next节点非空,且next节点的waitStatus<=0(说明next节点未被取消)时,通过CAS将pred的next执行node的next(即,pred.next = node.next)。同时,如果该CAS操作失败是没关系的,说明有其他线程操作已经修改了该pre的next值。
⑧ 如果待取消的node节点不是队尾节点,并且步骤[7]条件不成立。那么执行『unparkSuccessor(node);』来释放当前这个待取消节点的下一个节点。(也就是说,当prev是head节点,或者prev也被取消的话,会执行『unparkSuccessor(node);』来释放node的下一个节点,其实也就是pred的下一个节点)


从上面的分析我们可以知道,其实CountDownLatch中线程的释放其实是有顺序的,根据节点入队的顺序依次被释放,先入队的节点的线程会先被释放。

后记

如果文章有错不吝指教 :)

相关文章

网友评论

    本文标题:CountDownLatch 源码浅析

    本文链接:https://www.haomeiwen.com/subject/upiptftx.html