美文网首页
Java CountDownLatch

Java CountDownLatch

作者: rabbit_coding | 来源:发表于2018-10-20 18:38 被阅读0次

    CountDownLatch 通过初始化一个计数器,通过countDown减少计数器,当计数器变成0时,才开始执行await之后的操作。在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

    CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。

    虽然,CountDownlatch与CyclicBarrier有那么点相似,但是他们还是存在一些区别的:

    1、CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。

    2、 CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

    CountDownLatch介绍

    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CountDownLatchTest {
        public static CountDownLatch countDownLatch = new CountDownLatch(2);
        private static ExecutorService executor = Executors.newFixedThreadPool(2);
    
        public static void countDown() {
            try {
                System.out.println(Thread.currentThread().getName() + "开始执行");
                Thread.sleep(10000);
                countDownLatch.countDown();
                System.out.println(Thread.currentThread().getName() + "执行完成");
            } catch (Exception e) {
    
            }
    
        }
    
        public static class Task implements Runnable {
            @Override
            public void run() {
                countDown();
            }
        }
    
        public static void main(String[] args) throws Exception {
            System.out.println(new Date() + ":主线程开始等待");
            for (int i = 0; i < 2; i++) {
                executor.execute(new Task());
            }
            countDownLatch.await();
            System.out.println(new Date() + ":主线程执行完");
        }
    }
    
    Sat Oct 20 18:03:49 CST 2018:主线程开始等待
    pool-1-thread-1开始执行
    pool-1-thread-2开始执行
    pool-1-thread-1执行完成
    pool-1-thread-2执行完成
    Sat Oct 20 18:03:59 CST 2018:主线程执行
    

    可以看到主线程会在await处阻塞,一直到其他两个线程调用countDown,将CountDownLatch减少到0,才会执行await之后的操作

    CountDownLatch源码分析

    初始化就比较简单了,主要就是去setState,countDown和await都是对这个state进行操作,同时我们会发现CountDownLatch使用的是共享锁

      public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    CountDownLatch常用的2个方法,await和countDown,await在计数器不为0的时候,阻塞线程,将线程挂起,当countDown将计数器减少到0的时候,会唤起所以阻塞的线程。
    await():使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
    await(long timeout, TimeUnit unit): 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间

    来看一下await方法如何实现阻塞线程的

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    acquireSharedInterruptibly()的作用是获取共享锁。如果在获取共享锁过程中线程中断则抛出InterruptedException异常。否则通过tryAcquireShared方法来尝试获取共享锁。如果成功直接返回,否则调用doAcquireSharedInterruptibly方法

        //CountDownLatch重写tryAcquireShared
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    可以看到,await方法,通过判断计数器state是否为0 ,如果不为0,就doAcquireSharedInterruptibly,其实就是将当前线程封装成node,插入到CLH(等待锁的线程队列)尾部,根据tryAcquireShared(即state是否为0的条件),如果不为0,挂起线程,等待被唤醒

    doAcquireSharedInterruptibly是AQS中的函数,在分析Semaphore源码https://www.jianshu.com/p/12093d997c02的时候分析过了,就不讲了

    CountDownLatch的countDown()方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程,doReleaseShared通过unparkSuccessor的LockSupport.unpark函数,将线程唤醒

        public void countDown() {
            sync.releaseShared(1);
        }
        public final boolean releaseShared(int arg) {
            //计数器为0的时候,释放所有的线程
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                //当前计数器state
                int c = getState();
                if (c == 0)
                    return false;
                //countDown减少计数器
                int nextc = c-1;
                //计数器为0,返回true
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    总结

    CountDownLatch内部通过“共享锁”实现,在创建CountDownLatch时,需要传递一个int类型的state参数,该state参数为“锁状态”的初始值,该值表示着该“共享锁”可以同时被多少线程获取。当某个线程调用await方法时,如果state==0,表示可获取共享锁,否则一直处于等待直到获取为止。当线程调用countDown方法时,计数器state – 1。当在创建CountDownLatch时初始化的state参数,必须要调用state次的countDown方法才会使计数器state等于0,前面等待的线程才会继续运行。

    相关文章

      网友评论

          本文标题:Java CountDownLatch

          本文链接:https://www.haomeiwen.com/subject/ryymzftx.html