countdownlatch源码分析

作者: 一只小哈 | 来源:发表于2016-09-25 10:40 被阅读971次

    countdownlatch是JDK提供的一个线程控制的工具类,虽然代码短少,实现简单,但是它的作用却十分的大。

    1.从一个例子开始####

    1.现有一文件,文件的大小超过100G,现在的需求是,计算文件中每一行数据的MD5值。
    2.现在要实现一个RPC请求模型,要求实现,RPC过程中的请求超时的判断和处理流程。

    先说第一个场景,第一个场景需要计算所有文件的MD5,但是100G文件处理相对较大,那么我们势必要利用多线程去并行处理大文件,并将最后的结果输出。但是如何控制主线程在所有线程结束之后结束,是一个需要思考的过程。

    第二个场景,在RPC请求发出后,我们需要在一定时间内等待请求的响应,在超时之后没有响应的,我们需要抛出异常。

    上面两种场景,其实用wait notify都可以解决,但是实现起来是比较麻烦的,但是用countdownlatch解决起来十分简单。

    拿第一个例子来说,我们简单的实现一下:

    package countdownlatch;
    
    import com.google.common.base.Charsets;
    import com.google.common.hash.HashCode;
    import com.google.common.hash.HashFunction;
    import com.google.common.hash.Hashing;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 多线程处理一个文件
     */
    public class MultiThread {
        private static ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
        private static CountDownLatch latch;
        private static final int ThreadNum = 10;
    
        static {
            for (int i = 0; i < 10; i++) {
                blockingQueue.add("test" + i);
            }
            latch = new CountDownLatch(10);
        }
    
        /**
         * 用blockQueue中的元素模拟文件分片
         * @return
         */
        public static String getFileSplit() {
            return blockingQueue.poll();
        }
    
        static class myThread implements Runnable {
    
            public void run() {
                System.out.println(Thread.currentThread().getName() + "begin running...");
                String m = getFileSplit();
                HashFunction hf = Hashing.md5();
                HashCode hc = hf.newHasher()
                        .putString(m, Charsets.UTF_8)
                        .hash();
                System.out.println(hc.toString());
                try {
                    Thread.currentThread().sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
                System.out.println(Thread.currentThread().getName() + "ended");
            }
        }
    
        public static void main(String args[]){
            System.out.println("主线程开始运行");
            ExecutorService service = Executors.newFixedThreadPool(10);
            for (int i=0;i<ThreadNum;i++){
                service.execute(new Thread(new myThread()));
            }
            service.shutdown();
            System.out.println("线程已经全部运行");
            System.out.println("等待所有线程运行结束");
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("主线程退出");
        }
    }
    

    输出是这样的:
    主线程开始运行
    线程已经全部运行
    等待所有线程运行结束
    pool-1-thread-2begin running...
    pool-1-thread-6begin running...
    pool-1-thread-1begin running...
    pool-1-thread-3begin running...
    pool-1-thread-5begin running...
    pool-1-thread-9begin running...
    pool-1-thread-8begin running...
    pool-1-thread-10begin running...
    pool-1-thread-7begin running...
    pool-1-thread-4begin running...
    b04083e53e242626595e2b8ea327e525
    5e40d09fa0529781afd1254a42913847
    8ad8757baa8564dc136c1e07507f4a98
    86985e105f79b95d6bc918fb45ec7727
    739969b53246b2c727850dbb3490ede6
    5a105e8b9d40e1329780d62ea2265d8a
    4cfad7076129962ee70c36839a1e3e15
    ad0234829205b9033196ba818f7a872b
    f6f4061a1bddc1c04d8109b39f581270
    e3d704f3542b44a621ebed70dc0efe13
    pool-1-thread-3ended
    pool-1-thread-2ended
    pool-1-thread-10ended
    pool-1-thread-4ended
    pool-1-thread-7ended
    pool-1-thread-5ended
    pool-1-thread-6ended
    pool-1-thread-8ended
    pool-1-thread-1ended
    pool-1-thread-9ended
    主线程退出

    从输出我们可以看出,主线程确实是等所有线程结束后才退出的,这也正是我们预期的结果,有的童鞋就说了,我可以利用join实现和你一样的效果,但是Join是基于wait实现的,每一个线程join另一个线程就会有一个线程进入wait状态,也就是说同一时刻只有一个线程在运行,多余的CPU都浪费掉了,这显然不是很合理。

    2.说说countdownlatch的API####

    countdownlatch的API真的很少,下图是这个工具类的完整结构。


    Paste_Image.png

    我们可以看到主要方法有三个:await(),await(long, TimeUnit),countDown()

    await():阻塞当前线程,直到latch的值为0,或当前线程被中断

         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
    

    await(long, TimeUnit):阻塞当前线程,知道latch为0,线程被中断,或者超时。

         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
         * or the specified waiting time elapses.
    

    countDown():使latch的值减小1

           Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
    

    3.说说countdownlatch的实现

    countdownlatch其实是基于同步器AbstractQueuedSynchronizer实现的,ReentrantLock其实也是基于AbstractQueuedSynchronizer实现的,那么好像预示了什么。

    首先看构造函数:

        /**
         * Constructs a {@code CountDownLatch} initialized with the given count.
         *
         * @param count the number of times {@link #countDown} must be invoked
         *        before threads can pass through {@link #await}
         * @throws IllegalArgumentException if {@code count} is negative
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    构造函数的参数是一个整数值,意思是说需要多少个latch。
    实体化Sync,sync是countdownlatch的内部类,它继承了AbstractQueuedSynchronizer。

     Sync(int count) {
                setState(count);
            }
    

    主要是将latch的值赋予AbstractQueuedSynchronizer的State
    再看await()方法:

     public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    await()内调用了 sync.acquireSharedInterruptibly(1) ;

    /**
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    这里先检测了线程中断状态,中断了则抛出异常,接下来调用tryAcquireShared,tryAcquireShared是Syn的实现的:

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    其实就是简单的获取了同步器的state,判断是否为0,之前博客里面有写ReentrantLock,两者的机制是一样的。因为countDownLacth实例化之后的State一般不是0,那么此方法返回-1.进入doAcquireSharedInterruptibly:

    /**
    /**
        * Acquires in shared interruptible mode.
        * @param arg the acquire argument
        */
       private void doAcquireSharedInterruptibly(int arg)
           throws InterruptedException {
           final Node node = addWaiter(Node.SHARED);
           boolean failed = true;
           try {
               for (;;) {
                   final Node p = node.predecessor();
                   if (p == head) {
                       int r = tryAcquireShared(arg);
                       if (r >= 0) {
                           setHeadAndPropagate(node, r);
                           p.next = null; // help GC
                           failed = false;
                           return;
                       }
                   }
                   if (shouldParkAfterFailedAcquire(p, node) &&
                       parkAndCheckInterrupt())
                       throw new InterruptedException();
               }
           } finally {
               if (failed)
                   cancelAcquire(node);
           }
       }
    

    这段代码是比较熟悉的在ReentrantLock中分析过,这里关键的点是parkAndCheckInterrupt()

    /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         * 
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    执行到此处时,线程会阻塞,知道有其他线程唤醒此线程,执行await之后,上文中的主线程阻塞在这。
    接下来分析下countDown():

        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    

    调用了Sync的releaseShared:

        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    接下来是tryReleaseShared

            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    此方法是用CAS减小State的值。如果State=0那么尝试唤醒等待线程,执行doReleaseShared:

        /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    这里需要关注一点就是unparkSuccessor,这个方法是唤醒上文中的主线程。至此countdownlatch的主流程就走通了。

    不得不说countdownlatch是一个很高的线程控制工具,极大的方便了我们开发。由于知识能力有限,上面是自己的一点见识,有什么错误还望提出,便于我及时改进。

    相关文章

      网友评论

      本文标题:countdownlatch源码分析

      本文链接:https://www.haomeiwen.com/subject/zvbxyttx.html