美文网首页
CountDownLatch的使用与解析

CountDownLatch的使用与解析

作者: Aisen | 来源:发表于2018-11-23 20:43 被阅读21次

    引言

    CountDownLatch是jdk1.5开始concurrent包里提供的,并发编程工具类。

    这个类能够使一个线程等待其他线程完成各自的工作后再执行,可用于多线程的并发执行。

    例如,应用程序的主线程希望在多个网络请求线程并发执行完后,刷新页面,避免串行请求导致网络请求耗时长。

    CountDownLatch的使用

    CountDownLatch的主要使用步骤是

    1、初始化,指定线程个数,CountDownLatch latch = new CountDownLatch(3);

    参数4代表线程的总数

    2、每个线程执行后执行latch.countDown();,代表一个线程执行完成,待完成的线程数减1。

    3、在线程添加latch.await();,阻塞该线程,等待其他子线程完成。

    Demo如下

    package com.example.zzh.myapplication;
    
    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchDemo {
    
        public static void main(String[] args) throws InterruptedException {
            // Let us create task that is going to
            // wait for four threads before it starts
            CountDownLatch latch = new CountDownLatch(3);
    
            long start = System.currentTimeMillis();
    
            // Let us create four worker
            // threads and start them.
            WorkerThread first = new WorkerThread(1000, latch, "worker-1");
            WorkerThread second = new WorkerThread(2000, latch, "worker-2");
            WorkerThread third = new WorkerThread(3000, latch, "worker-3");
    
            first.start();
            second.start();
            third.start();
    
            // The main task waits for four threads
            latch.await();
    
            // Main thread has started
            System.out.println(Thread.currentThread().getName() + " has finished. Spend Time = " + (System.currentTimeMillis() - start));
        }
    
        // A class to represent threads for which
        // the main thread waits.
        static class WorkerThread extends Thread {
    
            private int delay;
            private CountDownLatch latch;
    
            public WorkerThread(int delay, CountDownLatch latch, String name) {
                super(name);
                this.delay = delay;
                this.latch = latch;
            }
    
            @Override
            public void run() {
                try {
                    Thread.sleep(delay);
                    latch.countDown();
                    System.out.println(Thread.currentThread().getName() + " finished");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    运行结果

    worker-1 finished
    worker-2 finished
    worker-3 finished
    main has finished. Spend Time = 3006
    

    CountDownLatch的解析

    1、什么是AQS(AbstractQueuedSynchronizer)

    深入CountDownLatch源码,需要了解AQS(AbstractQueuedSynchronizer),因为CountDownLatch的底层原理是通过AQS(AbstractQueuedSynchronizer)里面的共享锁来实现的。

    推荐阅读:【死磕Java并发】—–J.U.C之AQS(一篇就够了)

    以下是上述文章的引用:

    AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架,JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。

    AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

    AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

    AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

    AQS的使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。AQS提供了独占锁和共享锁必须实现的方法。

    共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。对应的是独占锁,是一种悲观锁,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这样就限制了不必要的并发性,因为读操作并不会影响数据的一致性。

    在AQS中,共享锁获取锁,节点模式则为Node.SHARED。独占锁获取锁时,设置节点模式为Node.EXCLUSIVE

    CountDownLatch使用的是共享锁,继承AQS的方法有:

    • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

    • tryReleaseShared(int arg):共享式释放同步状态。

    上面Demo的队列同步器模型如下(参考这里

    image.png

    2、初始化源码解析

    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) {
            setState(count);
        }
    
        int getCount() {
            return getState();
        }
    
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState(); // 获取主存中的state值
                if (c == 0)
                    return false; //state已经为0 直接退出
                int nextc = c-1; // 减一 准备cas更新该值
                if (compareAndSetState(c, nextc)) //cas更新status值为nextc
                    return nextc == 0; //更新成功 判断是否为0 退出;更新失败则继续for循环,直到线程并发更新成功
            }
        }
    }
    
    private final Sync sync;
    
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    

    初始化做的工作是创建同步器实例,这个同步器就是上文提到的继承AQS的类,并实现共享锁方法。

    3、latch.countDown()解析

    public void countDown() {
        sync.releaseShared(1);
    }
        
    //AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    其中tryReleaseShared是上文实现的方法,主要的工作是CAS更新state值减一,并判断是否为0,如果为0返回true,说明所有线程都执行完成,可以做唤醒的工作doReleaseShared

    //AbstractQueuedSynchronizer.java
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    

    上面的逻辑是:

    如果当前节点是SIGNAL意味着,它正在等待一个信号,或者说它在等待被唤醒,因此做两件事,一是重置waitStatus标志位,二是重置成功后,唤醒下一个节点。

    如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。意味着需要将状态向后一个节点传播。

    这个死循环,退出的路只有一条,那就是h==head,即该线程是头节点,且状态为共享状态。

    4、latch.await()解析

    await是阻塞当前线程(中断被抛中断异常),等待被唤醒,源码如下

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    //AbstractQueuedSynchronizer.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    

    上面的逻辑是:

    如果线程被中断,则抛出异常。然后判断tryAcquireShared方法的返回值是否小于0,这个方法是第2步初始化实现的,当(getState() == 0)时则返回1,否则返回-1,即当state还没有减少到0时,则执行doAcquireSharedInterruptibly(arg)

    //AbstractQueuedSynchronizer.java
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);// 往同步队列中添加节点
        boolean failed = true;
        try {
            for (;;) { // 一个死循环 跳出循环只有下面两个途径
                final Node p = node.predecessor(); // 当前线程的前一个节点
                if (p == head) {
                    int r = tryAcquireShared(arg); //当getState()==0时则返回1,否则返回-1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);// 处理后续节点
                        p.next = null; // help GC
                        failed = false;
                        return;//当getState为0,并且为头节点,则跳出循环
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();// 响应打断 跳出循环
            }
        } finally {
            if (failed)
                cancelAcquire(node); //如果是打断退出的,则移除同步队列节点
        }
    }
    

    在同步队列中挂起的线程,它们自旋的形式查看自己是否满足条件醒来(state==0,且为头节点),如果成立(即被唤醒),将调用setHeadAndPropagate这个方法

    private void setHeadAndPropagate(Node node, int propagate) {
         Node h = head; // Record old head for check below
         setHead(node);
         if (propagate > 0 || h == null || h.waitStatus < 0) {
             Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    这个方法是将当前节点的下一个节点设置为头节点,且它也调用了doReleaseShared这个方法,在第3步解析latch.countDown中提到,这个方法就是将头节点设置为共享状态的,由此,共享状态传播下去。

    扩展内容

    1、CountDownLatch的优缺点

    优点:

    对使用者而言,你只需要传入一个int型变量控制任务数量即可,至于同步队列的出队入队维护,state变量值的维护对使用者都是透明的,使用方便。

    缺点:

    CountDownLatch设置了state后就不能更改,也不能循环使用。

    2、CountDownLatch的超时处理

    如果线程等待超过一定时间,可以取消阻塞被唤醒,那么可以通过设置await的参数

    //等待超过2s,自动被唤醒
    latch.await(2000, TimeUnit.MILLISECONDS);
    

    参考

    Java CountDownLatch解析(上)

    Java CountDownLatch解析(下)

    【死磕Java并发】—–J.U.C之AQS(一篇就够了)

    Java并发-独占锁与共享锁

    java共享锁实现原理及CountDownLatch解析

    CountDownLatch in Java

    相关文章

      网友评论

          本文标题:CountDownLatch的使用与解析

          本文链接:https://www.haomeiwen.com/subject/jkssqqtx.html