美文网首页
主线程等待子线程执行结束再执行—— CountDownLatch

主线程等待子线程执行结束再执行—— CountDownLatch

作者: 微子Lee | 来源:发表于2018-01-07 22:57 被阅读0次

    一、一个例子

    private static void countDownLatchTest(List<String> roles){
    CountDownLatch countDownLatch = new CountDownLatch(10);
    for (int i=0;i<10;i++){
    CountRunnable countRunnable = new CountRunnable(i+"",i+"",roles,countDownLatch);
    new Thread(countRunnable).start();
    }
    try {
    countDownLatch.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public class CountRunnable implements Runnable{

    private String id;
    private String name;
    private List<String> roles;
    private CountDownLatch countDownLatch;
    private String password;
    
    public CountRunnable(String id, String name, List<String> roles,
                         CountDownLatch countDownLatch){
        this.name = name;
        this.id = id;
        this.roles = roles;
        this.countDownLatch = countDownLatch;
    }
    
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        roles.add(name);
        System.out.println("执行了-id:"+id+"name:"+name+"roles:"+roles.size());
        try {
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    public List<String> getRoles() {
        return roles;
    }
    
    public void setRoles(List<String> roles) {
        this.roles = roles;
    }
    
    public String getPassword() {
        return password;
    }
    
    public void setPassword(String password) {
        this.password = password;
    }
    

    }
    输出结果:

    开始:1515333020630
    执行了-id:0name:0roles:1
    执行了-id:1name:1roles:2
    执行了-id:4name:4roles:4
    执行了-id:8name:8roles:7
    执行了-id:3name:3roles:9
    执行了-id:7name:7roles:10
    执行了-id:9name:9roles:8
    执行了-id:6name:6roles:6
    执行了-id:5name:5roles:5
    执行了-id:2name:2roles:4
    [0, 1, 2, 4, 5, 6, 8, 9, 3, 7]
    结束:1515333021639耗时:1009roles size:10
    可以看出,上面每个线程执行1000ms,总时长1009ms.

    二、源码介绍

    /**

    • Constructs a {@code CountDownLatch} initialized with the given count.
    • @param count the number of times {@link #countDown} must be invoked
    •    before threads can pass through {@link #await}
      
    • @throws IllegalArgumentException if {@code count} is negative
      */
      public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      this.sync = new Sync(count);
      }
      这里看清楚一点:构造器中要指定大于0的参数。

    然后创建了一个sync对象,这个Sync是一个内部类:

    /**

    • Synchronization control For CountDownLatch.

    • Uses AQS state to represent count.
      */
      private static final class Sync extends AbstractQueuedSynchronizer {
      private static final long serialVersionUID = 4982264981922014374L;

      Sync(int count) {
      setState(count);
      }

      int getCount() {
      return getState();
      }

      protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
      }

      protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
      int c = getState();
      if (c == 0)
      return false;
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
      return nextc == 0;
      }
      }
      }
      可以看到,Sync继承自AQS类的内部类,是CountDownLatch的核心实现。在内部类里有几个方法:

    getCount:获取当前等待的线程数
    tryAcquireShared:如果线程数为0才返回1,即当前没有等待的线程
    tryReleaseShared:重写AQS方法,实现每被countDown调用,就将标志位-1,直到标志位为0。
    而CountDownLatch的里的几个方法:

    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
    sync.releaseShared(1);
    }

    public long getCount() {
    return sync.getCount();
    }
    acquireSharedInterruptibly:

    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())//判断是否发生中断
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//注意:-1表示获取到了共享锁,1表示没有获取共享锁
    doAcquireSharedInterruptibly(arg);
    }
    继续深入doAcquireSharedInterruptibly,这段我是看不明白了:

    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //由于采用的公平锁,所以要将节点放到队列里
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {//本质是等待共享锁的释放
    final Node p = node.predecessor();//获得节点的前继
    if (p == head) { //如果前一个节点等于前继
    int r = tryAcquireShared(arg);//就判断尝试获取锁
    /*
    这里要注意一下r的值就2种情况-1和1:
    情况1.r为-1,latch没有调用countDown(),state是没有变化的导致state一直大于0或者调用了countDown(),但是state不等于0,直接在for循环中等待
    情况2.r为1,证明countDown(),已经减到0,当前线程还在队列中,state已经等于0了.接下来就是唤醒队列中的节点
    */
    if (r >= 0) {
    setHeadAndPropagate(node, r);//将当前节点设置头结点。
    p.next = null; // help GC 删除旧的头结点
    failed = false;
    return;
    }
    }
    //当前节点不是头结点,当前线程一直等待,直到获取到共享锁。
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    setHeadAndPropagate:

    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 记录头结点
    setHead(node);//设置node为头结点
    /*
    *从上面传递过来 propagate = 1;
    *一定会进入下面的判断
    */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
    Node s = node.next;//获得当前节点的下一个节点,如果为最后一个节点或者,为shared
    if (s == null || s.isShared())
    doReleaseShared();//释放共享锁
    }
    }
    isShared:

    final boolean isShared() {
    return nextWaiter == SHARED;
    }
    释放共享锁,通知后面的节点:

    private void doReleaseShared() {
    for (;;) {
    Node h = head;//获得头结点
    if (h != null && h != tail) {
    int ws = h.waitStatus;//获取头结点的状态默认值为0
    if (ws == Node.SIGNAL) {如果等于SIGNAL唤醒状态
    //将头结点的状态置成0,并使用Node.SIGNAL(-1)与0比较,continue,h的状态设置为0,不会再进入if (ws == Node.SIGNAL)
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }//判断ws是否为0,并且h的状态不等于0,这里是个坑啊,ws等于0,h就是0啊,所以if进不来的,并设置节点为PROPAGATE
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    如果头结点等于h,其实也没有一直loop,由于上面写的Node h = head,就算前面的条件都不满足,这里一定会break
    if (h == head) // loop if head changed
    break;
    }
    }
    深入到这里,以超出我的想象力。

    三、CountDownLatch总结

    CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count个线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行。


    image.png

    相关文章

      网友评论

          本文标题:主线程等待子线程执行结束再执行—— CountDownLatch

          本文链接:https://www.haomeiwen.com/subject/vjfvnxtx.html