美文网首页
并发工具类—— CountDownLatch

并发工具类—— CountDownLatch

作者: lucode | 来源:发表于2017-09-17 23:55 被阅读56次

    概述

    前段时间在解决请求风控服务器超时的问题时,涉及到到一个CountDownLunch的并发工具类,非常实用,顺记自然就去研究了一下相关的并发工具类。
    在JDK的并发包里(java.util.concurrent)提供了这样几个非常有用的并发工具类来解决并发编程的流程控制。分别是CountDownLatch、CyclicBarrier和Semaphore。

    1. CountDownLatch

    1.1 CountDownLatch是什么?

    CountDownLatch打多是被用在等待多线程完成,具体来说就是允许一个或多个线程等待其他线程完成操作。

    1.2 CountDownLatch原理?

    API 文档有这样一盒解释

    A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

    构造函数

    //Constructs a CountDownLatch initialized with the given count.
    public void CountDownLatch(int count) {...}
    

    在 CountDownLunch启动的时候。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

    public void countDown()
    

    在每次任务执行完直接调用,计数器就会减一操作。

    public boolean await(long timeout,TimeUnit unit) throws InterruptedException
    public void await() throws InterruptedException
    

    这个方法就是用来堵塞主线程的,前者是有等待时间的,可以自定义,后者是无限等待,知道其他count 计数器为0为止。

    看图可能一下子明白
    详细的 demo 就不在这里粘贴了
    如有需要传送门

    1.3 使用场景

    超时机制

    主线程里面设置好等待时间,如果发现在规定时间内还是没有返回结果,那就唤醒主线程,抛弃。

    开始执行前等待n个线程完成各自任务

    例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。

    死锁检测

    一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
    若有不正之处请多多谅解,并欢迎各位大牛批评指正。

    1.4 深入源码

    这里面我简单的研究了一下CountDownLunch 源码。
    底层是由AbstractQueuedSynchronizer提供支持(后面就简称 AQS),所以其数据结构就是AQS的数据结构,而AQS的核心就是两个虚拟队列:同步队列syncQueue 和条件队列conditionQueue(前者数据结构是双向链表,后者是单向链表)不同的条件会有不同的条件队列。
    本省CountDownLunch继承的是 Object,比较简单,但是存在内部类,Sync,继承自AbstractQueuedSynchronizer,我简单理解一下

    private static final class Sync extends AbstractQueuedSynchronizer {
            // 版本号
            private static final long serialVersionUID = 4982264981922014374L;
            
            // 构造器
            Sync(int count) {
                setState(count);
            }
       
            // 返回当前计数
            int getCount() {
                return getState();
            }
    
            // 试图在共享模式下获取对象状态
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            // 试图设置状态来反映共享模式下的一个释放
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                // 无限循环
                for (;;) {
                    // 获取状态
                    int c = getState();
                    if (c == 0) // 没有被线程占有
                        return false;
                    // 下一个状态
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc)) // 比较并且设置成功
                        return nextc == 0;
                }
            }
        }
    

    1.4.1核心函数分析

    • await函数
      此函数将会使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。其源码如下
    public void await() throws InterruptedException{
          // 转发到sync对象上 
          sync.acquireSharedInterruptibly(1);
    }
    

    源码可知,对CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用。

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    这里先检测了线程中断状态,中断了则抛出异常,接下来调用tryAcquireShared,tryAcquireShared是Syn的实现的

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    其实就是简单的获取了同步器的state,判断是否为0.
    接下来是

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    关键点 我看到的是
    parkAndCheckInterrupt()

      private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
     }
    

    执行到此处时,线程会阻塞,知道有其他线程唤醒此线程,执行await之后,上文中的主线程阻塞在这。


    整个调用链路如下
    • countDown函数
      此函数将递减锁存器的计数,如果计数到达零,则释放所有等待的线程
    void countDown() { 
        sync.releaseShared(1);
    }
    

    可以看出 对countDown的调用转换为对Sync对象的releaseShared(从AQS继承而来)方法的调用。
    这里面的具体原理能力有限,有点看不懂,CAS相关的东西。

    1.5 小结

    不得不说countdownlatch是一个很高的线程控制工具,极大的方便了我们开发。由于知识能力有限,上面是自己的一点见识,有什么错误还望提出,便于我及时改进。

    相关文章

      网友评论

          本文标题:并发工具类—— CountDownLatch

          本文链接:https://www.haomeiwen.com/subject/adlmsxtx.html