美文网首页
CountDownLatch 如何做到 控制线程执行顺序? 源码

CountDownLatch 如何做到 控制线程执行顺序? 源码

作者: 程序员阿奇 | 来源:发表于2020-06-30 17:34 被阅读0次

    先看例子:

    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch count1 = new CountDownLatch(0);
            CountDownLatch count2 = new CountDownLatch(1);
            CountDownLatch count3 = new CountDownLatch(1);
    
            Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
            Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
            Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
            threadA.start();
            threadB.start();
            threadC.start();
            Thread.sleep(1000);
        }
    }
    

    CounRunable 线程的实现 如下:

    public class CounRunable implements Runnable {
    
        private CountDownLatch count1;
        private CountDownLatch count2;
    
        public CounRunable(CountDownLatch count1, CountDownLatch count2) {
            this.count1 = count1;
            this.count2 = count2;
        }
    
        @Override
        public void run() {
            try {
                 count1.await();
                System.out.println(Thread.currentThread().getName());
                count2.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    执行结果如下:

    Thread-A
    Thread-B
    Thread-C
    
    Process finished with exit code 0
    

    以上代码 确实满足 线程的 顺序执行,那是怎么做到的呢?
    首先:我们创建了 三个 CountDownLatch 构造时 传入 的参数 分别 为:0、1、1,也就是说 :count1 、count2 、count3 持有的计算器 分别 为:0、1、1。那这个计数器有什么用呢?如下:

     public static void main(String[] args) throws InterruptedException {
            CountDownLatch count1 = new CountDownLatch(1);
            CountDownLatch count2 = new CountDownLatch(1);
            CountDownLatch count3 = new CountDownLatch(1);
    
            Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
            Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
            Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
            System.out.println(Thread.currentThread().getName());
            threadA.start();
            threadB.start();
            threadC.start();
    }
    

    执行结果如下:

    image.png

    发现 线程 阻塞住了,根据以上代码 发现 :count1,持有的计数器 变为了 1,找到 CountDownLatch 的构造方法:

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    找到 Sync 代码实现:

        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                // 设置状态
                setState(count);
            }
    
            int getCount() {
                // 获取最新的状态
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                // 根据当前状态的值 返回不同的值
                return (getState() == 0) ? 1 : -1;
            }
    
            // 尝试 将 state 设置为 0,
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    根据以上 代码 发现 Sync 为静态内部类 并继承了 AQS, 并重写了 tryAcquireShared 和 tryReleaseShared ,根据 new CountDownLatch(1)`` 发现 参数 1 被传递到了 setState(1)``` 这个方法,找到 setState(1);

        /**
         * Sets the value of synchronization state.
         * This operation has memory semantics of a {@code volatile} write.
         * @param newState the new state value
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
         /**
         * The synchronization state.
         */
        private volatile int state;
    

    发现 是父类 AbstractQueuedSynchronizer 即 AQS 中的一个 被 volatile 修饰的变量(对 这个关键字不了解的 请查看 我相关文章解释)。也就是说先的 state =1,接着分析,代码的执=执行:当 count1 、 count2、 count13 分别构造完成后,state 分别为 1、1、1;线程创建完成后,分别开始启动线程 Thread-A、Thread-B、Threa-C,并执行对应的 run() 方法。

     public void run() {
            try {
                 count1.await();
                System.out.println(Thread.currentThread().getName());
                count2.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    根据 Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A"); 可知调用 count1 的 wait() 方法,根据 以上执行结果可知,Thread-A 被阻塞了, 找到 wait() 的实现:

       public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    发现调用了 内部类 sync 的 acquireSharedInterruptibly(1) 方法。找到 具体实现,发现调用的是父类 AQS 的实现:

        public final void acquireSharedInterruptibly(int 1) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    分析 以上代码,如果 线程被中断了 则直接抛出异常,否则会继续往下执行,找到 tryAcquireShared(1) 具体实现如下( CountDownLatch自己实现了,且 AQS 并不提供默认的实现):

        protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    跟据以上代码,acquires =1 state = 1, 也就是说 该方法的 返回值为 -1,tryAcquireShared(1) < 0 则为 true, 则继续执行 doAcquireSharedInterruptibly(1)。找到 对应的实现:

    private void doAcquireSharedInterruptibly(int arg)  throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    分析以上代码:添加一个Node 节点到队列中,然后开始 自旋,然后判断 添加的Node节点 的 是否为 head 节点,如果当前的节点为 head 节点 则 执行 tryAcquireShared(1) 或去 state =1 即 r =-1,不满足 if (r >= 0),则 继续自旋,当然线程 Thread-A 也就被阻塞在了这里。那怎么跳出自旋呢?发现 当 满足 r>=0 时,也就是 state = 0 时。怎样 才能 让 state = 0 呢? 构造时候 传入 0,或者调用 countDown(); 方法:

        public void countDown() {
            sync.releaseShared(1);
        }
    

    找到 对应的实现:

       public final boolean releaseShared(int arg) {
           if (tryReleaseShared(arg)) {
               doReleaseShared();
               return true;
           }
           return false;
       }
    
      protected boolean tryReleaseShared(int releases) {
               // Decrement count; signal when transition to zero
               for (;;) {
                   int c = getState();
                   if (c == 0)
                       return false;
                   int nextc = c-1;
                   if (compareAndSetState(c, nextc))
                       return nextc == 0;
               }
           }
    

    分析以上代码:通过自旋 判断 state 是否 为 0,如果为0,返回 false ,否则 int nextx = c -1 ; 并通过CAS 设置 state 如果设置成功 并判断 nextx 是否为 0 也是 state 是否为0;如果 state= 0则 返回true 则 执行 doReleaseShared() 实现如下:

        private void doReleaseShared() {
       
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    分析以上代码:判断当前 head 节点和 tail 节点是否为head 节点, 如果条件满足,则会判断 head 节点的 waitStae 是否为 -1,如果为 -1 则 执行 unparkSuccessor(h) 唤醒线程。

    相关文章

      网友评论

          本文标题:CountDownLatch 如何做到 控制线程执行顺序? 源码

          本文链接:https://www.haomeiwen.com/subject/ajexqktx.html