美文网首页一些收藏
CountDownLatch笔记

CountDownLatch笔记

作者: 12313凯皇 | 来源:发表于2020-07-05 13:22 被阅读0次

环境配置:
IDEA 2020.1
jdk 1.8

最近在看项目的时候接触到了CountDownLatch这个类,简单了解了下后发现它好像可以用于多图上传,于是抽空先详细的了解一下这个类的内部实现机制。

概述

简单理解就是CountDownLatch是一个带有计数器的门栓,它的构造函数需要我们传递一个count值,然后它有两个常用的方法:await()countDown(),当在某一线程调用了await()方法时,该线程会处于等待状态,直到计数器count的值减为0。而countDown()方法恰好可以使这个计数器减一,每调用一次计数器减一,当计数器为0时再调用此方法什么而也不会发生。另外await方法还有另外一个重载函数await(long timeout, TimeUnit unit),这个方法可以设置一个最长的等待时间,如果超出了这个时间而计数器还未减为0,则释放调用该方法的所在线程

CountDownLatch这个类看起来很简单,它对外暴露了几个方法,每个方法看起来都很简洁:

public class CountDownLatch {
    //构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

     public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

实现原理

从上面的代码可以看出,CountDownLatch的实现基于一个SyncSyncCountDownLatch的一个静态内部类,继承自AbstractQueuedSynchronizer。在AbstractQueuedSynchronizer中维护了一个volatile类型的整数statevolatile可以保证多线程环境下该变量的修改对每个线程都可见,并且由于该属性为整型,因而对该变量的修改也是原子的①。创建一个CountDownLatch对象时,所传入的count就会赋值给state属性,当调用countDown()方法时,该线程就会尝试对state减一,而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。下面是Sync的实现代码:

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState(); // 获取当前state属性的值
            if (c == 0) // 如果state为0,则说明当前计数器已经计数完成,直接返回
                return false; 
            int nextc = c-1;
            if (compareAndSetState(c, nextc))     // 使用CAS算法对state进行设置
                return nextc == 0;   // 设置成功后返回当前是否为最后一个设置state的线程
        }
    }
}

这里tryReleaseShared(int)方法即对state属性进行减一操作的代码。可以看到,CAS也即compare and set的缩写,jvm会保证该方法的原子性,其会比较state是否为c,如果是则将其设置为nextc(自减1),如果state不为c,则说明有另外的线程在getState()方法和compareAndSetState()方法调用之间对state进行了设置,当前线程也就没有成功设置state属性的值,其会进入下一次循环中,如此往复,直至其成功设置state属性的值,即countDown()方法调用成功。

下面就来一一分析CountDownLatch的几个核心方法的实现原理:

  • CountDown方法
public void countDown() {
     sync.releaseShared(1);
}

//AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared方法中,可以看到先是调用到了tryReleaseShared方法,在调用tryReleaseShared(int)方法时会在无限for循环中设置state属性的值,设置成功之后其会根据设置的返回值(此时state已经自减了一),即当前线程是否为将state属性设置为0的线程,来判断是否执行if块中的代码。doReleaseShared()方法主要作用是唤醒调用了await()方法的线程。需要注意的是,如果有多个线程调用了await()方法,这些线程都是以共享的方式等待在await()方法处的,因为如果以独占的方式等待,那么当计数器减少至零时,就只有一个线程会被唤醒执行await()之后的代码,这显然不符合逻辑。如下是doReleaseShared()方法的实现代码:

private void doReleaseShared() {
    for (;;) {
        Node h = head;  // 记录等待队列中的头结点的线程
        if (h != null && h != tail) { // 头结点不为空,且头结点不等于尾节点
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // SIGNAL状态表示当前节点正在等待被唤醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除当前节点的等待状态
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 唤醒当前节点的下一个节点
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed 如果h还是指向头结点,说明前面这段代码执行过程中没有其他线程对头结点进行过处理
            break;
    }
}

doReleaseShared()方法中(始终注意当前方法是最后一个执行countDown()方法的线程执行的),首先判断头结点不为空,且不为尾节点,说明等待队列中有等待唤醒的线程,这里需要说明的是,在等待队列中,头节点中并没有保存正在等待的线程,其只是一个空的Node对象,真正等待的线程是从头节点的下一个节点开始存放的,因而会有对头结点是否等于尾节点的判断。在判断等待队列中有正在等待的线程之后,其会清除头结点的状态信息,并且调用unparkSuccessor(Node)方法唤醒头结点的下一个节点,使其继续往下执行。如下是unparkSuccessor(Node)方法的具体实现:

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);  // 清除当前节点的等待状态

    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // s的等待状态大于0说明该节点中的线程已经被外部取消等待了
        s = null;
        // 从队列尾部往前遍历,找到最后一个处于等待状态的节点,用s记录下来
        for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 唤醒离传入节点最近的处于等待状态的节点线程
}

可以看到,unparkSuccessor(Node)方法的作用是唤醒离传入节点最近的一个处于等待状态的线程,使其继续往下执行。前面我们讲到过,等待队列中的线程可能有多个,而调用countDown()方法的线程只唤醒了一个处于等待状态的线程,这里剩下的等待线程是如何被唤醒的呢?其实这些线程是被当前唤醒的线程唤醒的。具体的我们可以看看await()方法的具体执行过程。如下是await()方法的代码:

//CountDownLatch.java
public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

//AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

await()方法实际还是调用了Sync对象的方法acquireSharedInterruptibly(int)方法。可以看到acquireSharedInterruptibly(int)方法判断当前线程是否需要以共享状态获取执行权限,这里tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在前面的Sync类中,可以看到,其主要是判断state是否为零如果为零则返回1,表示当前线程不需要进行权限获取,可直接执行后续代码,返回-1则表示当前线程需要进行共享权限具体的获取执行权限的代码在doAcquireSharedInterruptibly(int)方法中,如下是该方法的具体实现:

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);  // 使用当前线程创建一个共享模式的节点
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor(); // 获取当前节点的前一个节点
            if (p == head) {  // 判断前一个节点是否为头结点
                int r = tryAcquireShared(arg);  // 查看当前线程是否获取到了执行权限
                if (r >= 0) {  // 大于0表示获取了执行权限
                    setHeadAndPropagate(node, r);  // 将当前节点设置为头结点,并且唤醒后面处于等待状态的节点
                    p.next = null; // help GC
                    failed = false; 
                    return;
                }
            }

            // 走到这一步说明没有获取到执行权限,就使当前线程进入“搁置”状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly(int)方法中,首先使用当前线程创建一个共享模式的节点然后在一个for循环中判断当前线程是否获取到执行权限如果有(r >= 0判断)则将当前节点设置为头节点,并且唤醒后续处于共享模式的节点;如果没有,则对调用shouldParkAfterFailedAcquire(Node, Node)parkAndCheckInterrupt()方法使当前线程处于“搁置”状态,该“搁置”状态是由操作系统进行的,这样可以避免该线程无限循环而获取不到执行权限,造成资源浪费,这里也就是线程处于等待状态的位置,也就是说当线程被阻塞的时候就是阻塞在这个位置。当有多个线程调用await()方法而进入等待状态时,这几个线程都将等待在此处。这里回过头来看前面将的countDown()方法,其会唤醒处于等待队列中离头节点最近的一个处于等待状态的线程,也就是说该线程被唤醒之后会继续从这个位置开始往下执行,此时执行到tryAcquireShared(int)方法时,发现r大于0(因为state已经被置为0了),该线程就会调用setHeadAndPropagate(Node, int)方法,并且退出当前循环,也就开始执行awat()方法之后的代码。这里我们看看setHeadAndPropagate(Node, int)方法的具体实现:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node); // 将当前节点设置为头节点
     // 检查唤醒过程是否需要往下传递,并且检查头结点的等待状态
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())  // 如果下一个节点是尝试以共享状态获取获取执行权限的节点,则将其唤醒
            doReleaseShared();
    }
}

setHeadAndPropagate(Node, int)方法主要作用是设置当前节点为头结点,并且将唤醒工作往下传递,在传递的过程中,其会判断被传递的节点是否是以共享模式尝试获取执行权限的如果不是,则传递到该节点处为止(一般情况下,等待队列中都只会都是处于共享模式或者处于独占模式的节点)。也就是说,头结点会依次唤醒后续处于共享状态的节点,这也就是共享锁与独占锁的实现方式。这里doReleaseShared()方法也就是我们前面讲到的会将离头结点最近的一个处于等待状态的节点唤醒的方法。

源码译文

CountDownLatch这个类内部实现代码并不多,加上注释也才300多行(JDK 8),于是强行利用词典翻译了一下,方便以后再次阅读。虽然是直接强行翻译的,但是对于我这个菜鸡来说相对比直接看英文要能理解的更快一点 = =

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 * 一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * CountDownLatch使用给定的数count进行初始化。
 * 
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 * await方法会被阻塞,直到当前计数count由于调用countDown方法而达到零,
 * 之后所有等待的线程都会被释放,随后的任何await方法调用都会立即返回。
 * 这是一个一次性的现象,计数不能重置。如果您需要一个重新设置计数的版本,可以考虑使用CyclicBarrier。
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 * CountDownLatch是一个通用的同步工具,可以用于多种目的。
 * 初始化一个计数为1的CountDownLatch可以作为一个简单的开/关门栓:所有调用await的
 * 线程都在门处等待,直到它被调用{#countDown}的线程打开。
 * 初始化为N的CountDownLatch可以让一个线程等待,直到N线程已经完成某个动作,或者某个动作已经完成N次。
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 * CountDownLatch的一个有用属性是它不要求调用countDown的线程在继续执行之前
 * 等待计数达到0,它只是阻止任何调用了await的线程,直到所有线程都能通过。
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * 示例用法:这里是一对类,其中一组工作线程使用两个countdownlatch:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * 第一个是开始信号,阻止任何工人继续工作,直到司机准备好让他们继续;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * 第二个是完成信号,允许司机等待,直到所有的工人完成。
 * </ul>
 *
 * <pre> {@code
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *     this.startSignal = startSignal;
 *     this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *     try {
 *       startSignal.await();
 *       doWork();
 *       doneSignal.countDown();
 *     } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }}</pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 * 另一种典型的用法是将一个问题分成N个部分,用Runnable描述每个部分,
 * 该Runnable执行该部分并在锁存器上向下计数,然后将所有Runnables排队到一个Executor。
 * 当所有子部件完成后,协调线程将能够通过await。(当线程必须以这种方式重复倒数时,
 * 请使用{@link CyclicBarrier}。)
 *
 * <pre> {@code
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *     this.doneSignal = doneSignal;
 *     this.i = i;
 *   }
 *   public void run() {
 *     try {
 *       doWork(i);
 *       doneSignal.countDown();
 *     } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }}</pre>
 *
 * <p>Memory consistency effects: Until the count reaches
 * zero, actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     * 使当前线程等待,直到锁存器计数到零为止,除非该线程{@linkplain Thread#interrupt interrupted}。
     *
     * <p>If the current count is zero then this method returns immediately.
     * 如果当前计数为零,则此方法立即返回。
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * 如果当前计数大于零,则出于线程调度目的,当前线程将被禁用,并且在发生以下两种情况之一之前,它处于休眠状态:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * 由于{@link #countDown}方法的调用,计数达到零;或者
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * 当前线程还有其他一些线程{@linkplain Thread#interrupt interrupts}。
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     * 如果经过了指定的等待时间,则返回值{@code false}。 
     * 如果时间小于或等于零,则该方法将根本不等待。
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     * 减少锁存器的计数,如果计数达到零,则释放所有等待的线程。
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     * 如果当前计数大于零,则将其递减。 如果新计数为零,则将重新启用所有等待线程以进行线程调度。
     *
     * <p>If the current count equals zero then nothing happens.
     * 如果当前计数等于零,那么什么也不会发生。
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     * 此方法通常用于调试和测试目的。
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

传送门

参考文章

相关文章

网友评论

    本文标题:CountDownLatch笔记

    本文链接:https://www.haomeiwen.com/subject/qfxzfktx.html