美文网首页
JUC-CyclicBarrier

JUC-CyclicBarrier

作者: 别拿爱情当饭吃 | 来源:发表于2018-10-22 17:26 被阅读6次

    一.什么是CyclicBarrier?
    CyclicBarrier就是循环屏障
    二.CyclicBarrier的用途?
    n个线程相互等待,直到所有线程到达同一个点后,再同时执行
    三.CyclicBarrier的应用场景
    一起去做某件事,才能完成某件事
    四.Cyclicbarrier的测试代码

    import java.util.concurrent.*;
    
    /**
     * @author Aaron
     * @date 2018/10/22 下午3:10
     * @function 测试CyclicBarrier怎样使用
     */
    public class TestCyclicBarrier {
        private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
    
        private static final CyclicBarrier cb = new CyclicBarrier(4, new Runnable() {
            @Override
            //唤醒所有线程后,第一个执行的线程
            public void run() {
                System.out.println("寝室四兄弟准备一起出发去球场");
            }
        });
    
        private static class GoThread extends Thread{
            private final String name;
            public GoThread(String name){
                this.name = name;
            }
    
            public void run(){
                System.out.println(name+"开始从宿舍出发");
                try {
                    Thread.sleep(1000);
                    cb.await();//拦截线程
                    System.out.println(name+"从楼底下出发");
                    Thread.sleep(1000);
                    System.out.println(name+"到达操场");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            test2();
        }
    
        public static void test1(){
            String[] str = {"小汤","小赵","小高","小吴"};
            for (int i=0;i<4;i++){
                threadPool.execute(new GoThread(str[i]));
            }
            try {
                Thread.sleep(4000);
                System.out.println("四个人一起到达球场,现在开始打球");
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void test2(){
            String[] str = {"小汤","小高","小赵","小吴"};
            String[] str1 = {"大汤","大高","大赵","大吴"};
            for (int i=0;i<4;i++){
                threadPool.execute(new GoThread(str[i]));
    
    
            }
            try {
                Thread.sleep(4000);
                System.out.println("四个人一起到达球场,现在开始打球");
                System.out.println("现在对CyclicBarrier进行复用。。。。。。");
                System.out.println("又来了一拨人,看看愿不愿意一起打");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            //进行复用
            for (int i=0;i<4;i++){
                threadPool.execute(new GoThread(str1[i]));
    
            }
            try {
                Thread.sleep(4000);
                System.out.println("四个人一起到达球场,表示愿意一起打球,现在八个人开始打球");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    五.CyclicBarrier的源代码解析
    1.内部类

    public class CyclicBarrier {
        /**
         * Each use of the barrier is represented as a generation instance.
         * The generation changes whenever the barrier is tripped, or
         * is reset. There can be many generations associated with threads
         * using the barrier - due to the non-deterministic way the lock
         * may be allocated to waiting threads - but only one of these
         * can be active at a time (the one to which {@code count} applies)
         * and all the rest are either broken or tripped.
         * There need not be an active generation if there has been a break
         * but no subsequent reset.
         */
    这个Generation类只定义了一个布尔变量
        private static class Generation {
            boolean broken = false;
        }
    

    2.CyclicBarrier的核心代码

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;   
    //获取独占锁
        lock.lock();
        try {
    //获取当前的generation
            final Generation g = generation;
    //假如当前的generation被损坏了,就抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
    //如果当前线程被中断,调用breakBarrier中断CyclicBarrier,并唤醒所有等待的线程
            if (Thread.interrupted()) {
                breakBarrier();//见下一个方法解析
                throw new InterruptedException();
            }
    //将计数器减1
            int index = --count;
    //当index等于0时,就意味着parties个线程到达了屏障barrier
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();//唤醒所有线程
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
    //如果有parties个线程到达屏障后,或者线程被中断,或者超过最大等待时间,那么就会唤醒线程
            for (;;) {
                try {
    //判断调用的方法是“非超时等待”还是“超时等待”
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    

    3.breakBarrier方法

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    1.终止线程,并且重新设置count的值为parties
    2.唤醒所有线程
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    4.nextGeneration方法

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    1、唤醒所有线程
    2、重新设置count的值为parties
    3、生层新的一代
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
    5.await()方法(本质是调用了doawait方法)
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    

    六、CyclicBarrier和CountDownLatch的区别
    1、CyclicBarrier的屏障可以循环使用;但是CountDownLatch不能循环使用
    2、CyclicBarrier是n个线程互相等待;CountDownLatch是m(一个或多个)等待n个
    3、CyclicBarrirt可以使用CountDownLatch实现,代码如下

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Aaron
     * @date 2018/10/22 下午3:39
     * @function 用CountDownLatch实现CyclicBarrier
     */
    public class TestCyclicBarrierWithCDL {
        private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(4);
        private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
    
        private static class GoHead extends Thread{
            private final String name;
            public GoHead(String name){
                this.name = name;
            }
            public void  run(){
                System.out.println(name+"开始出发");
                COUNT_DOWN_LATCH.countDown();
    
                try {
                    Thread.sleep(1000);
                    COUNT_DOWN_LATCH.await();
                    System.out.println(name+"从楼底下出发");
                    Thread.sleep(1000);
                    System.out.println(name+"到达操场");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            String[] str = {"A","B","C","D"};
            for (int i=0;i<4;i++){
                THREAD_POOL_EXECUTOR.execute(new GoHead(str[i]));
            }
            try {
                Thread.sleep(4000);
                System.out.println("四个人一起到达球场,现在开始打球");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:JUC-CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/drggzftx.html