美文网首页
CountDownLatch源码解析

CountDownLatch源码解析

作者: i砖工 | 来源:发表于2020-05-27 16:03 被阅读0次

    总体介绍

    一个同步器:能够让一个或者多个线程等待等待某个条件的到来再继续执行。


    CountDownLatch模型

    大家可以把CountDownLatch初始化的值认为是有N道门,刚开始是线程调用await方法发现门关着的,所以就只有等待。而外界条件的变化是通过countDown来实现,可以认为countDown一次就是打开一道门,当countDown的次数为N时,则全部的门打开,则刚才在await处等待的线程将继续执行任务。
    看下具体的代码实现:

    public class CountDownLatch {
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
         ...
        }
    
        private final Sync sync;
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public void countDown() {
            sync.releaseShared(1);
        }
    
    }
    

    CountDownLatch的代码非常简单,内部有一个基于AQS实现的同步器,重载了构造方法,然后就是await和countDown方法。

    同步器的实现

    我们先看一下同步器的实现:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {  //同步器的构造方法,必须传入初始化状态
            setState(count);
        }
           //获取状态
        int getCount() {
            return getState();
        }
    /**
    AQS的tryAcquireShared的实现,看到tryAcquireShared我们更多的会想起ReentrantReadWriteLock;
    它的同步器同样对tryAcquireShared进行了实现,但是他们之间的实现有很大区别:
    这里的tryAcquireShared过程根本没有对state进行累加,反而是只判断state是否等于0,
    所以这里大家就可以把state想象成我们刚才说的门,state为多少就有多少门。
    **/
    //代码1
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    
    //这里的tryReleaseShared与读写锁的实现逻辑大致一样
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    

    await方法

    接下来我们来看下是如果利用AQS实现latch的,首先我们看await方法:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); //调用同步器获取锁
    }
    

    同步器调用AQS的方法:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)  
            doAcquireSharedInterruptibly(arg);
    }
    

    这里整体的方法已经在AQS的源码解析过专门讲解过,共享节点如果放入同步队列,如何等待,如果冒泡共享锁等,这里不再专门说,我们要关注的核心是tryAcquireShared,由代码1我们可以看到tryAcquireShared的实现只有当state为0时才会获取锁成功(CountDownLatch的初始会将state出初始化为非0),否则都是获取锁失败,而获取锁失败的线程自然就进入了同步队列进行等待。这就是实现一个或者多少线程并发的情况,协同的等待某个条件。

    countDown方法

    再来看下如何实现打开这N个门的:

    public void countDown() {
        sync.releaseShared(1);
    }
    

    看到这段代码其实就已经足够了,每次countDown都会释放一次锁(其实就是state减1,其实就是打开一道门),当初始化的值N都被减去后,则在await上等待锁的线程将会被唤醒(释放锁的线程会尝试唤醒等待锁的节点),从而继续执行任务。

    总结:

    1.CountDownLatch使用AQS的共享模式实现。
    2.初始化N时其实相当于初始化了N把共享锁,只是这N把锁不是通过tryAcquireShared来获取的,而是直接初始化的。
    3.业务线程调用await相当于就是等待初始化的N把锁释放
    4.countDown其实就相当于释放着N把锁,一次释放一把

    相关文章

      网友评论

          本文标题:CountDownLatch源码解析

          本文链接:https://www.haomeiwen.com/subject/uviwahtx.html