美文网首页并发
AQS之CountDownLatch和Semaphore源码分析

AQS之CountDownLatch和Semaphore源码分析

作者: loveFXX | 来源:发表于2019-12-20 20:00 被阅读0次

    CountDownLatch

    示例代码

    public class CountDownLatchTest {
        public static void main(String[] args) throws InterruptedException {
            final CountDownLatch latch = new CountDownLatch(5 );
            ExecutorService service = Executors.newCachedThreadPool();
            for (int i = 0; i <5 ; i++) {
                final  int a=i;
                service.execute( new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep( 2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(a);
                        latch.countDown();
                    }
                } );
            }
    //      latch.await();
            latch.await(1, TimeUnit.SECONDS);
            System.out.println("1111111111111111");
    
        }
    }
    

    CountDownLatch

    CountDownLatch#CountDownLatch
    创建CountDownLatch类,把传入参数创建AQS的实现类Sync


    image.png
    Sync

    CountDownLatch.Sync#Sync
    创建Sync类,调用setState(count)方法。把状态值state改为CountDownLatch的传入参数


    image.png

    countDown

    执行countDown方法
    CountDownLatch#countDown


    image.png
    releaseShared

    AbstractQueuedSynchronizer#releaseShared


    image.png
    tryReleaseShared

    CountDownLatch.Sync#tryReleaseShared


    image.png

    释放过程,每调用一次countDown方法。获取当前的状态值,如果等于0返回false。并重新设置减一后的状态值。直到当前状态值等于0并返回true

    doReleaseShared

    唤醒线程执行
    AbstractQueuedSynchronizer#doReleaseShared


    image.png

    这里是个死循环,AQS队列为空退出。只调用countDown方法。AQS队列没有初始化,一直为null

    await()

    CountDownLatch#await()
    此方法会一直等待,直到Latch的计数器到达0的或调用interrupt方法


    image.png

    AbstractQueuedSynchronizer#acquireSharedInterruptibly


    image.png
    CountDownLatch.Sync#tryAcquireShared
    image.png
    AbstractQueuedSynchronizer#doAcquireSharedInterruptibly

    当状态值不等于0时进入


    image.png
    addWaiter
    AbstractQueuedSynchronizer#addWaiter
    初始化队列并对当前线程入队
    image.png
    把当前调用await方法的初始化并添加当前线程到AQS队列,然后一直循环直到当前线程节点的前一个节点时head节点。再次调用
    tryAcquireShared(arg)方法,查看当前状态值state是否是0(返回1),如果是则结束循环返回。

    await(long,TimeUnit)

    java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)


    image.png

    AbstractQueuedSynchronizer#tryAcquireSharedNanos


    image.png
    AbstractQueuedSynchronizer#doAcquireSharedNanos
    image.png

    调用过程与await()相似,主要多了一步如果超时直接返回无论是否执行完线程

    Semaphore

    示例代码

    public class SemaphoreTest {
        public static void main(String[] args) throws InterruptedException {
            final Semaphore semaphore = new Semaphore( 2 );
            ExecutorService service = Executors.newCachedThreadPool();
            for (int i = 0; i <5 ; i++) {
                final  int a=i;
                service.execute( new Runnable() {
                    @Override
                    public void run() {
                        try {
    //                      semaphore.tryAcquire( 1,TimeUnit.SECONDS );
                            semaphore.acquire();
                            TimeUnit.SECONDS.sleep( 1 );
                            System.out.println(a+System.currentTimeMillis());
                            semaphore.release();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                } );
            }
    
        }
    }
    
    Semaphore

    Semaphore#Semaphore(int)
    把构造方法参数创建Sync对面,默认是非公平


    image.png

    Semaphore.Sync
    用来设置状态值state


    image.png

    acquire

    Semaphore#acquire()
    拿到一个许可信号量


    image.png

    AbstractQueuedSynchronizer#acquireSharedInterruptibly
    判断得到许可值是否有可用的


    image.png
    tryAcquireShared

    Semaphore.FairSync#tryAcquireShared


    image.png

    获取状态值。调用一次acquire()方法,状态值减1并通过CAS设置状态值,直到状态值为0。
    如示例代码线程t1、t2(获取许可信号量,state变化)


    image.png
    如果前两个线程没有调用release()方法,第三个线程t3执行acquire()。tryAcquireShared方法直接返回-1。将会调用doAcquireSharedInterruptibly()方法。
    AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
    image.png

    调用addWaiter()方法初始化队列,并把阻塞的线程t3添加到队列中,如果AQS头结点之后的节点没有获取信号量,则park当前线程。其它线程(t4、t5等等)调用tryAcquireShared方法查看当前队列有值,直接返回-1。并添加到队列中。释放节点,当第二个节点线程持有,可以持有许可信号量,便会执行setHeadAndPropagate方法重新设置头结点。

    release

    Semaphore#release()
    释放许可信号量


    image.png
    image.png

    Semaphore.Sync#tryReleaseShared


    image.png
    释放许可信号量,调用一次release方法状态值state加1。

    总结:

    CountDownLatch和Semaphore都是利用AQS的状态值state创建初始化容量大小
    CountDownLatch 调用countDown方法,数量到达0,释放所有线程
    Semaphore 调用acquire方法,持有许可。没有获取许可的线程加入AQS队列
    CountDownLatch 状态值不可还原,Semaphore状态值可以恢复初始大小

    相关文章

      网友评论

        本文标题:AQS之CountDownLatch和Semaphore源码分析

        本文链接:https://www.haomeiwen.com/subject/bmhmnctx.html