美文网首页java进阶干货
并发辅助类CountDownLatch的使用和源码

并发辅助类CountDownLatch的使用和源码

作者: 激情的狼王 | 来源:发表于2018-01-11 13:11 被阅读0次

    CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他10个线程的任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

    CountDownLatch是通过一个计数器来实现的,计数器的初始值为那10个线程的数量也就是10。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上await()等待的线程就可以恢复执行任务。

    下面我们来详细分析

    构造函数:

    public CountDownLatch(int count) {  };  //参数count为计数值,也就是需要等几个线程结束的个数
    

    它有三个主要方法:

    public void await() throws InterruptedException { };   
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
    public void countDown() { };  
    

    await():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    await(long timeout, TimeUnit unit):和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    countDown() :将count值减1,通常在等待的线程完成时调用,当10个线程都执行完,都减1后,count值为0,被挂起的线程就可以启动了。

    实例:使用await在主线程阻塞,当每个子线程执行完了,就调用latch.countDown()一次,知道最后count的值为0,才解开主线程的等待;

    public static void main(String[] args) {
    
            final CountDownLatch latch = new CountDownLatch(2);
    
            new Thread() {
    
                public void run() {
    
                    System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
    
                    System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
    
                    latch.countDown();
    
                };
    
            }.start();
    
            new Thread() {
    
                public void run() {
    
                    System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
    
                    System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
    
                    latch.countDown();
    
                };
    
            }.start();
    
            try {
    
                System.out.println("等待2个子线程执行完毕...");
    
                latch.await();
    
                System.out.println("2个子线程已经执行完毕");
    
                System.out.println("继续执行主线程");
    
            } catch (InterruptedException e) {
    
                e.printStackTrace();
    
            }
    
        }
    

    输出结果:

    子线程Thread-0正在执行
    子线程Thread-0执行完毕
    等待2个子线程执行完毕...
    子线程Thread-1正在执行
    子线程Thread-1执行完毕
    2个子线程已经执行完毕
    继续执行主线程
    

    从输出上我们可以知道这个CountDownLatch的使用方法和执行过程了,接下来我们通过对它的主要方法的分析来看一下实现原理。

    源码解析:

    1.CountDownLatch(int count)

    这个是CountDownLatch的构造函数,我们跟进看一下

    public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    先判断是否count的值是否正常,如果小于0,直接抛出异常,否则创建一个Sync对象

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    我们看到这里使用了AQS的Sync,唯一与lock不同的是把state设置为了count,然后获取锁的逻辑tryAcquireShared方法也做了对应的调整,这里获取锁的话判断state是否为0。

    2.await()

    await方法用于阻塞线程,也就是令当前线程阻塞直到拿到锁(state==0也就是count==0)

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    先判断是否中断,如果中断的话响应中断并抛出异常,结束阻塞,然后通过tryAcquireShared获取锁,我们来看tryAcquireShared方法

    protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    拿到锁的唯一条件就是state==0,也就是子线程通过countDown()方法把count变为0才可以。我们接下来先看doAcquireSharedInterruptibly()上锁的过程。

    3.doAcquireSharedInterruptibly()
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    这里的逻辑和lock方法的逻辑基本一致,只是稍作修改,主线程通过parkAndCheckInterrupt方法进行了阻塞。

    4.countDown()
    public void countDown() {
            sync.releaseShared(1);
        }
    
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    先调用了tryReleaseShared来解锁,我们看一下这个过程

    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    拿到state,然后通过CAS把state-1,最后返回Boolean值,如果state==0,说明锁释放返回true,如果state>0,返回false,这里也就证明了我们的猜想,countdown方法就是来把state每次减一的,直到所有子线程执行完,减为0,锁释放。我们继续看锁释放后的执行过程。

    5.doReleaseShared()
    private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    这里的步骤也和lock的释放过程类似,最后通过waitStatus的判断来执行unparkSuccessor()唤醒阻塞的线程。

    6.setHeadAndPropagate()

    当唤醒await的线程后,会执行第3步doAcquireSharedInterruptibly()里的setHeadAndPropagate()

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    可以看到,把唤醒的node设置为了head节点,也就是node拿到锁可以继续执行了,那么如果有其它的await也在等待呢?此时count为0,其它的肯定也要向下执行,是怎么连续唤醒的呢,我们看本方法里的Node s = node.next;这里判断后续阻塞节点,如果存在,就执行 doReleaseShared();持续唤醒,doReleaseShared()在也就是第5步,他会解除head节点的next的阻塞,然后再执行本步骤,设置为head,循环唤醒。

    最后:

    这里循环唤醒也是共享锁的实现方式。在这里我们也再次印证了AQS是java.util.concurrent包下几乎所有类的实现核心,像CountDownLatch、CyclicBarrier和Semaphore三大辅助类,lock等都是基于AQS来实现自己的控制逻辑的。

    相关文章

      网友评论

        本文标题:并发辅助类CountDownLatch的使用和源码

        本文链接:https://www.haomeiwen.com/subject/vquznxtx.html