美文网首页
JUC学习笔记之CountDownLatch源码初步理解

JUC学习笔记之CountDownLatch源码初步理解

作者: Moine0828 | 来源:发表于2019-02-24 21:26 被阅读0次

    注:文中代码的解释基本上都以注释的形式和代码写在一起

    CountDownLatch是并发环境中常用的计数组件,也是基于AQS实现的。主要的方法有两个,countDown和await,实现了AQS模板方法的tryReleaseShared方法来完成countDown计数减的过程,实现了AQS模板方法的tryAcquireShared方法来实现await阻塞等待功能。

    countDown方法

    countDown方法源码如下,直接调用了内部类sync的releaseShared方法来实现,这里的Sync和ReentrantLock的内部类Sync一样,是继承了AQS的内部类,releaseShared方法正是AQS提供的共享模式的模板方法。

    public void countDown() {
       //直接调用了AQS的releaseShared 
       sync.releaseShared(1); 
    }
    

    AQS的releaseShared方法源码如下

    public final boolean releaseShared(int arg) {
            //tryReleaseShared方法AQS留给子类自己实现
            //尝试将status减去arg,如果返回为true,执行doRelease方法,否则返回
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    releaseShared方法中调用了CountDownLatch中实现的tryReleaseShared方法,源码如下

    protected boolean tryReleaseShared(int releases) {
                //通过源码我们发现传入的参数releases并没有什么用,每次计数固定减一
                // 无限循环直到值减一成功或者status变成0
                for (;;) {
                    //获取status的值,CountDownlatch中status的值代表要等待的总计数
                    int c = getState();
                    //如果已经是0了说明已经不能再减计数了,返回false
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    //CAS的方式将status减一
                    if (compareAndSetState(c, nextc))
                        如果当前减完之后,status是0,也就意味着计数结束了,返回true
                        return nextc == 0;
                }
            }
    

    tryReleaseShared方法返回为true,也就是计数结束时,会接着执行doReleaseShared方法。doReleaseShared方法在CountDownLatch中没有重写,直接调用的是AQS的doReleaseShared方法,源码如下,其中unparkSuccessor源码解析见另一篇博客《AQS源码解析》

    private void doReleaseShared() {
        //无限循环
        for (;;) {
            //获取头节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果头节点设置的是要唤醒下一个节点的等待状态
                if (ws == Node.SIGNAL) {
                    //将节点的waitStatus设置成0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                       continue;            // loop to recheck cases
                       //如果设置成功,唤醒后面的等待节点
                       unparkSuccessor(h);
                }
                //ws==0说明第一步设置成功或者原先就是0
                //将其状态设置为PROPAGATE
                //失败(PROPAGATE状态表示同步状态将会无条件传播,意思就是节点可运行)
                else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                //如果h还是头节点,就结束循环
                if (h == head)                   // loop if head changed
                    break;
         }
     }
    

    await方法

    await方法直接调用了内部类Sync的acquireSharedInterruptibly方法,阻塞线程直到count为0,当前线程才能拿到锁(或者抛出异常也有可能结束阻塞)。源码如下:

    public void await() throws InterruptedException {
            //直接调用了内部类Sync的方法(其实是AQS的方法)
            sync.acquireSharedInterruptibly(1);
        }
    

    acquireSharedInterruptibly方法,如果线程被中断过就抛出异常结束阻塞,不然就判断计数的值,为0就返回,等于当前阻塞的线程获得了锁,如果计数不为0,进入doAcquireSharedInterruptibly方法进行排队等待。源码如下:

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //如果线程被中断过,抛出异常,线程不再等待 
            if (Thread.interrupted())
                throw new InterruptedException();
            //tryAcquireShared尝试获取共享状态
            //tryAcquireShared源码见下方,实际就是获取计数
            //计数不为0则执行doAcquireSharedInterruptibly方法
            //计数为0则返回值大于0,方法直接返回,线程阻塞结束。等于线程获得了锁
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    

    doAcquireSharedInterruptibly方法只有在线程阻塞时会被调用,也就是计数不为0时被调用。方法将当前线程构造为一个共享模式的等待节点加入等待队列中,然后开启无限自循环,直到计数等于0获取到锁,或者抛出异常为止。源码如下:

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //新建一个共享模式节点加入等待队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    //获取新建节点的前一个节点
                    final Node p = node.predecessor();
                    //如果前一个就是头节点
                    if (p == head) {
                        //说明当前线程是第一个等待的,尝试获取锁
                        //也就是获取计数
                        int r = tryAcquireShared(arg);
                        //r不小于0说明计数已经是0了,等于当前线程已经获得了锁
                        if (r >= 0) {
                            //将当前线程的节点设置成头节点
                            setHeadAndPropagate(node, r);
                            //原先的头节点p从队列中解除,便于垃圾回收
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //shouldParkAfterFailedAcquire检查如果获取锁失败当前节点是否需要挂起
                    //只有当前一个节点的waitStatus是SIGNAL也就是说前一个节点
                    //获得锁以后会把自己唤醒,当前节点才能放心挂起
                    //parkAndCheckInterrupt判断节点是否被中断过
                    //这里意思是如果当前节点是在被挂起状态而且被中断过就抛出异常
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                //如果failed为true说明线程被中断,没有获得锁
                //所以取消获取锁的动作
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    总结来说就是await方法会让当前线程阻塞,进入方法后先判断一次计数是否为0,如果是0则直接返回,线程获得锁,阻塞结束。如果不为0则进入doAcquireSharedInterruptibly方法,将当前线程构造成了一个等待节点,开启无限循环,线程被阻塞。无限循环直到当前线程的节点排队排到了头节点的后面,就可以尝试获得锁了,如果成功了就可以返回,阻塞结束。在排队的过程中如果线程被中断那么就抛出异常。简而言之阻塞是由无限的for循环造成的,所以结束循环就是线程结束阻塞的关键了。

    提问:await方法支持多个线程一起等待吗
    回答:支持的。从源码角度看,await方法并没有做任何的同步控制,多个线程等待和一个线程等待,结果没有什么不同,所有等待的线程都会阻塞到status为0,阻塞过程中这些线程就乖乖的在队列里等待。

    相关文章

      网友评论

          本文标题:JUC学习笔记之CountDownLatch源码初步理解

          本文链接:https://www.haomeiwen.com/subject/gitiyqtx.html