美文网首页JUC并发相关
17. 并发终结之CyclicBarrier

17. 并发终结之CyclicBarrier

作者: 涣涣虚心0215 | 来源:发表于2020-09-27 03:56 被阅读0次

CyclicBarrier是让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程到达屏障,屏障打开,所有被阻塞的线程继续运行。

与CountDownLatch比较

CountDownLatch是一组线程等待其他一个或一组线程完成工作之后再执行,类似加强版join,await()用来等待,countDown()负责计数器的减一。
CyclicBarrier是一组线程内部进行等待,且可以循环使用
CyclicBarrier(int parties, Runnable barrierAction)这个是在屏障开放时,barrierAction定义的线程会被执行。即用barrierAction来指定一个任务,以实现一种等待线程结束的效果;barrierAction中的任务只有在目标线程结束后才能被执行。
事实上,这完全可以通过Thread.jon()或者CountDownLatch来实现。
如果CyclicBarrier.await()的调用不在一个循环之中,并且使用的目的也不知为了模拟高并发,这就是滥用。

底层执行流程:
  1. 初始化CyclicBarrier中各种成员变量,包括parties(不可变),count(计数器)以及Runnable(可选)
  2. 当调用await方法时,底层先检查计数器是否归零,如果是的话,执行可选Runnable,并进入下一个generation。
  3. 进入nextGeneration,将会重置count = parties,并且创建新的generation实例。同时会调用signAll方法唤醒所有屏障前等待的线程。
  4. 如果计数器没有归零,那么当前调用condition.await()方法,在屏障前等待。
  5. 以上方法都在lock锁中,所以不会出现并发的情况。
源码分析
  1. 成员变量
    CyclicBarrier内部使用lock和condition来使一组线程来等待以及唤醒。
    final的parties表示线程数量,count表示一个generation里面在WAITING的线程个数,初始值是parties。
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;
  1. await
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    //拿到lock并进行上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //拿到generation
        final Generation g = generation;
        //如果这个generation是被broken的,则抛出BrokenBarrierException
        if (g.broken)
            throw new BrokenBarrierException();
        //如果当前线程是被中断了
        if (Thread.interrupted()) {
            //breakBarrier会设置generation.broken=true,重置count=parties,并且signalAll唤醒所有等待的线程
            breakBarrier();
            throw new InterruptedException();
        }
         //每次一个线程进入await,就减一下count
        int index = --count;
        //当count减为0表示所有线程都到达屏障,则调用barrierCommand
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //nextGeneration则会唤醒所有的等待线程,并且重置count=parties以及generation
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        //如果count没有到0,表示还有线程没有到达屏障,则当前线程一直await,加入condition queue,等待唤醒。
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

相关文章

  • 17. 并发终结之CyclicBarrier

    CyclicBarrier是让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程到达屏障,屏障打开,所有被阻塞的...

  • JUC之AQS—Cyclicbarrier

    导读:这篇文章介绍的是java并发组件aqs之CyclicBarrier Cyclicbarrier概念: Cyc...

  • java并发之CyclicBarrier

    java并发之CyclicBarrier 知识导读 CyclicBarrier是一个阻塞器,当阻塞线程达到Cycl...

  • 高并发(9)- 线程并发工具类-CyclicBarrier

    @[TOC](高并发(9)- 线程并发工具类-CyclicBarrier) 前言 上篇文章讲解了线程的并发工具类之...

  • Java并发之CyclicBarrier

    barrier(屏障)与互斥量、读写锁、自旋锁不同,它不是用来保护临界区的。相反,它跟条件变量一样,是用来协同多线...

  • Java并发之CyclicBarrier

    barrier(屏障)与互斥量、读写锁、自旋锁不同,它不是用来保护临界区的。相反,它跟条件变量一样,是用来协同多线...

  • 第八章 并发工具类

    并发包提供的并发工具类。并发编程的瑞士军刀。 CountDownLatch、CyclicBarrier和Semap...

  • CyclicBarrier--循环栅栏

    1、引入循环栅栏CyclicBarrier CyclicBarrier是另外一种多线程并发控制工具。和CountD...

  • 记录CountDownLatch

    Java并发编程:CountDownLatch、CyclicBarrier和 Semaphore CountDow...

  • Java并发包之CyclicBarrier

    CountDownLatch:同步援助,允许一个或多个线程等待其他线程正在执行的一组操作完成。CyclicBarr...

网友评论

    本文标题:17. 并发终结之CyclicBarrier

    本文链接:https://www.haomeiwen.com/subject/hnowhktx.html