美文网首页我爱编程程序员
[Java源码][并发J.U.C]---并发工具类CountDo

[Java源码][并发J.U.C]---并发工具类CountDo

作者: nicktming | 来源:发表于2018-09-08 09:07 被阅读3次

    前言

    CountDownLatch允许一个或多个线程等待其他线程完成操作.

    本文代码地址: 源码下载

    例子

    package com.sourcecode.concurrencytools;
    
    public class CountDownLatchTest {
        static CountDownLatch c = new CountDownLatch(2);
        public static void main(String[] args) throws InterruptedException {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(1);
                    c.countDown();
                    System.out.println(2);
                    //c.countDown();
                    /**
                     *  打开注释 会依次打印1,2,3
                     *  关闭注释 会依次打印1,2 Main线程会阻塞在await()方法
                     */
                }
            }).start();
            c.await();
            System.out.println("3");
        }
    }
    

    可以通过打开注释和关闭注释观察一下各自区别,进而可以简单理解CountDownLatch的作用.

    实现思路

    源码如下: 其实源码(总共也就一百来行)没有太多要分析的,逻辑也非常简单,主要依靠的还是AQS.

    package com.sourcecode.concurrencytools;
    
    import com.sourcecode.reentrantreadwritelock.AbstractQueuedSynchronizer;
    import java.util.concurrent.TimeUnit;
    
    public class CountDownLatch {
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            // 返回当前AQS的状态值
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                // 其实跟传入的参数acquires没有什么实质的作用
                // 根据当前AQS的状态值是否为0,如果为0就获得锁,如果不为0会进入到AQS中的acquireSharedInterruptibly方法中
                // 具体的操作需要了解AQS
                return (getState() == 0) ? 1 : -1;
            }
    
            // 释放 逻辑非常简单
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
        private final Sync sync;
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public boolean await(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
        public void countDown() {
            sync.releaseShared(1);
        }
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    }
    

    思路如下:
    1. CountDownLatch c = new CountDownLatch(n) 此时AQS也就是sync对象的状态值为n.
    2. c.await()函数会使任何当前线程阻塞当sync的状态值不为0时,所有调用c.await()方法的线程都会被加入到sync的同步等待队列中并且节点类型为shared. 如果sync的状态值为0时,c.await()函数会使不会阻塞,当前线程会正常执行下面的代码.
    3. c.countDown() 会使sync的状态值减1,如果状态值减为0的时候,tryReleaseShared会返回true,此时会唤醒所有调用c.await()方法而阻塞的线程.

    针对第三点做一点补充说明:看看如何唤醒所有线程的

    1. releaseShared会调用Sync父类AbstractQueuedSynchronizerreleaseShared(int arg)方法如下:

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    2. 调用Sync重写父类的tryReleaseShared(arg)当状态值为0的时候,该方法会返回true进而会调用父类中的doReleaseShared()方法唤醒同步队列中的一个线程.
    3. 2中唤醒的线程会从AbstractQueuedSynchronizer中的doAcquireSharedInterruptibly中的parkAndCheckInterrupt()中返回进而通过tryAcquireShared去尝试获得锁,此时由于当前状态值为0,会返回1,表示获得锁,然后调用setHeadAndPropagate(node, r)(其中r=1)方法去设置头节点并且尝试去唤醒同步队列后面的线程.
    4. setHeadAndPropagate(node, r)方法在满足以下条件的情况下又会调用doReleaseShared()从而进入到1.中一步步释放所有由于c.await()方法而阻塞的线程.

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head;  // 记录一下旧的头节点
            setHead(node);  // 将当前节点设置为头节点
            /** 
             * 如果propagate > 0 说明锁还可以被别的线程拿到
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    例子2: 关注异常退出

    await()除了上面讲的正常退出外,还有就是在阻塞过程中被别的线程中断的时候也会退出. 如下图所示,先启动一个自定义线程并调用await()方法并且捕获异常,在主线程中断该线程.

    package com.sourcecode.concurrencytools;
    
    import java.util.concurrent.TimeUnit;
    
    public class CountDownLatchTest3 {
        static CountDownLatch c = new CountDownLatch(1);
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new MyThread();
            thread.start();
            TimeUnit.SECONDS.sleep(1);
            thread.interrupt();
            //c.countDown();
            System.out.println(Thread.currentThread() + "----->finished!");
        }
    
        static class MyThread extends Thread {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "----->before await");
                    c.await();
                    System.out.println(Thread.currentThread() + "----->after await");
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread() + "----->in interrupted exception.");
                }
                System.out.println(Thread.currentThread() + "----->finished!");
            }
        }
    }
    

    结果如下: 可以看到当主线程中断线程thread时,线程threadawait()方法中返回. 至此可以看到await方法是响应中断的.

    Thread[Thread-0,5,main]----->before await
    Thread[main,5,main]----->finished!
    Thread[Thread-0,5,main]----->in interrupted exception.
    Thread[Thread-0,5,main]----->finished!
    

    对于另外一个await(long timeout, TimeUnit unit)有三种退出方式: 原理基本上差不多就不重复分析了.
    1. 正常退出(当状态值为0)
    2. 中断退出(被其他线程中断)
    3. 超时退出(时间超过了预设等待的时间)

    参考

    1. Java并发编程的艺术

    相关文章

      网友评论

        本文标题:[Java源码][并发J.U.C]---并发工具类CountDo

        本文链接:https://www.haomeiwen.com/subject/cpsqgftx.html