美文网首页
CountDownLatch 如何做到 控制线程执行顺序? 源码

CountDownLatch 如何做到 控制线程执行顺序? 源码

作者: 程序员阿奇 | 来源:发表于2020-06-30 17:34 被阅读0次

先看例子:

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count1 = new CountDownLatch(0);
        CountDownLatch count2 = new CountDownLatch(1);
        CountDownLatch count3 = new CountDownLatch(1);

        Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
        Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
        Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
        threadA.start();
        threadB.start();
        threadC.start();
        Thread.sleep(1000);
    }
}

CounRunable 线程的实现 如下:

public class CounRunable implements Runnable {

    private CountDownLatch count1;
    private CountDownLatch count2;

    public CounRunable(CountDownLatch count1, CountDownLatch count2) {
        this.count1 = count1;
        this.count2 = count2;
    }

    @Override
    public void run() {
        try {
             count1.await();
            System.out.println(Thread.currentThread().getName());
            count2.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

执行结果如下:

Thread-A
Thread-B
Thread-C

Process finished with exit code 0

以上代码 确实满足 线程的 顺序执行,那是怎么做到的呢?
首先:我们创建了 三个 CountDownLatch 构造时 传入 的参数 分别 为:0、1、1,也就是说 :count1 、count2 、count3 持有的计算器 分别 为:0、1、1。那这个计数器有什么用呢?如下:

 public static void main(String[] args) throws InterruptedException {
        CountDownLatch count1 = new CountDownLatch(1);
        CountDownLatch count2 = new CountDownLatch(1);
        CountDownLatch count3 = new CountDownLatch(1);

        Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A");
        Thread threadB = new Thread(new CounRunable(count2,count3),"Thread-B");
        Thread threadC = new Thread(new CounRunable(count3,count3),"Thread-C");
        System.out.println(Thread.currentThread().getName());
        threadA.start();
        threadB.start();
        threadC.start();
}

执行结果如下:

image.png

发现 线程 阻塞住了,根据以上代码 发现 :count1,持有的计数器 变为了 1,找到 CountDownLatch 的构造方法:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

找到 Sync 代码实现:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 设置状态
            setState(count);
        }

        int getCount() {
            // 获取最新的状态
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            // 根据当前状态的值 返回不同的值
            return (getState() == 0) ? 1 : -1;
        }

        // 尝试 将 state 设置为 0,
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

根据以上 代码 发现 Sync 为静态内部类 并继承了 AQS, 并重写了 tryAcquireShared 和 tryReleaseShared ,根据 new CountDownLatch(1)`` 发现 参数 1 被传递到了 setState(1)``` 这个方法,找到 setState(1);

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

     /**
     * The synchronization state.
     */
    private volatile int state;

发现 是父类 AbstractQueuedSynchronizer 即 AQS 中的一个 被 volatile 修饰的变量(对 这个关键字不了解的 请查看 我相关文章解释)。也就是说先的 state =1,接着分析,代码的执=执行:当 count1 、 count2、 count13 分别构造完成后,state 分别为 1、1、1;线程创建完成后,分别开始启动线程 Thread-A、Thread-B、Threa-C,并执行对应的 run() 方法。

 public void run() {
        try {
             count1.await();
            System.out.println(Thread.currentThread().getName());
            count2.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

根据 Thread threadA = new Thread(new CounRunable(count1,count2),"Thread-A"); 可知调用 count1 的 wait() 方法,根据 以上执行结果可知,Thread-A 被阻塞了, 找到 wait() 的实现:

   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

发现调用了 内部类 sync 的 acquireSharedInterruptibly(1) 方法。找到 具体实现,发现调用的是父类 AQS 的实现:

    public final void acquireSharedInterruptibly(int 1) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

分析 以上代码,如果 线程被中断了 则直接抛出异常,否则会继续往下执行,找到 tryAcquireShared(1) 具体实现如下( CountDownLatch自己实现了,且 AQS 并不提供默认的实现):

    protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

跟据以上代码,acquires =1 state = 1, 也就是说 该方法的 返回值为 -1,tryAcquireShared(1) < 0 则为 true, 则继续执行 doAcquireSharedInterruptibly(1)。找到 对应的实现:

private void doAcquireSharedInterruptibly(int arg)  throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

分析以上代码:添加一个Node 节点到队列中,然后开始 自旋,然后判断 添加的Node节点 的 是否为 head 节点,如果当前的节点为 head 节点 则 执行 tryAcquireShared(1) 或去 state =1 即 r =-1,不满足 if (r >= 0),则 继续自旋,当然线程 Thread-A 也就被阻塞在了这里。那怎么跳出自旋呢?发现 当 满足 r>=0 时,也就是 state = 0 时。怎样 才能 让 state = 0 呢? 构造时候 传入 0,或者调用 countDown(); 方法:

    public void countDown() {
        sync.releaseShared(1);
    }

找到 对应的实现:

   public final boolean releaseShared(int arg) {
       if (tryReleaseShared(arg)) {
           doReleaseShared();
           return true;
       }
       return false;
   }

  protected boolean tryReleaseShared(int releases) {
           // Decrement count; signal when transition to zero
           for (;;) {
               int c = getState();
               if (c == 0)
                   return false;
               int nextc = c-1;
               if (compareAndSetState(c, nextc))
                   return nextc == 0;
           }
       }

分析以上代码:通过自旋 判断 state 是否 为 0,如果为0,返回 false ,否则 int nextx = c -1 ; 并通过CAS 设置 state 如果设置成功 并判断 nextx 是否为 0 也是 state 是否为0;如果 state= 0则 返回true 则 执行 doReleaseShared() 实现如下:

    private void doReleaseShared() {
   
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

分析以上代码:判断当前 head 节点和 tail 节点是否为head 节点, 如果条件满足,则会判断 head 节点的 waitStae 是否为 -1,如果为 -1 则 执行 unparkSuccessor(h) 唤醒线程。

相关文章

网友评论

      本文标题:CountDownLatch 如何做到 控制线程执行顺序? 源码

      本文链接:https://www.haomeiwen.com/subject/ajexqktx.html