CyclicBarrier

作者: 程序员札记 | 来源:发表于2022-04-14 17:42 被阅读0次

    从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
    它的作用就是会让所有线程都等待完成后才会继续下一步行动。
    举个例子,CyclelicBarrier 就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,所有人到才吃饭。 然后各自离开。
    CountDownlatch 是不管人有没有到齐,谁先到谁吃饭。 等所有人吃完一起离开。

    package com.conrrentcy.juc;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierDemo {
    
        static class TaskThread extends Thread {
            
            CyclicBarrier barrier;
            
            public TaskThread(CyclicBarrier barrier) {
                this.barrier = barrier;
            }
            
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(getName() + " 到达栅栏 A");
                    barrier.await();
                    System.out.println(getName() + " 冲破栅栏 A");
                    
                    Thread.sleep(2000);
                    System.out.println(getName() + " 到达栅栏 B");
                    barrier.await();
                    System.out.println(getName() + " 冲破栅栏 B");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) {
            int threadNum = 5;
            CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
                
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 完成最后任务");
                }
            });
            
            for(int i = 0; i < threadNum; i++) {
                new TaskThread(barrier).start();
            }
        }
    } 
    

    构造方法

    有两个构造方法,只有带Runnable参数的构造方法才会在所有线程都到达等待点之后执行Runnable里面的run方法。

        /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and which
         * will execute the given barrier action when the barrier is tripped,
         * performed by the last thread entering the barrier.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @param barrierAction the command to execute when the barrier is
         *        tripped, or {@code null} if there is no action
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and
         * does not perform a predefined action when the barrier is tripped.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    

    维护锁状态逻辑

    其底层使用ReentrantLock+Condition进行锁状态的维护
    1、维护锁状态

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    

    2、线程组数
    private final int parties;

    3、所有线程到达等待点后执行的Runnable
    private final Runnable barrierCommand;

    4、需要等待的线程数量
    private int count;

    5、屏障点定义

    private static class Generation {
        boolean broken = false;
    }
    

    具体看看其是如何实现等待逻辑的,线程等待需要调用await方法

    public int await() {
            return dowait(false, 0L);
     }
        
    public int await(long timeout, TimeUnit unit){
        return dowait(true, unit.toNanos(timeout));
    }
    

    最终调用的是dowait方法

    private int dowait(boolean timed, long nanos){
            final ReentrantLock lock = this.lock;
            1、获取锁
            lock.lock();
            try {
                final Generation g = generation;
                if (g.broken)
                    throw new BrokenBarrierException();
                2、如果线程中断,重置等待线程数量并且唤醒当前等待的线程 
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
                3、等待线程数减1
                int index = --count;
                4、当等待线程数为 时
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        5、执行所有线程都到达等待点之后的Runnable 
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        6、唤醒所有线程并生成下一代 
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
                7、如果等待线程数不为0
                for (;;) {
                    try {
                        8、根据传入的参数来决定是定时等待还是非定时等待
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        9、线程中断之后唤醒所有线程并进入下一代
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else { 
                          Thread.currentThread().interrupt();
                        }
                    }
                    10、如果线程因为打翻屏障操作而被唤醒则抛出异常
                    if (g.broken)
                        throw new BrokenBarrierException();
                    11、如果线程因为换代操作而被唤醒则返回计数器的值
                    if (g != generation)
                        return index;
                    12、如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
     
            } finally {
                lock.unlock();
            }
        }
    

    可以看到,是通过index字段控制线程等待的,当index不为0的时候,线程统一会进行阻塞,直到index为0的时候,才会唤醒所有线程,这时候所有线程才会继续往下执行。

    重复使用

    这个跟CountdownLatch不一样的是,CountdownLatch是一次性的,而CycliBarrier是可以重复使用的,只需调用一下reset方法。

    public void reset() {
    
        final ReentrantLock lock = this.lock;
    
        lock.lock();
    
        try {
    
            1、破坏当前的屏障点并唤醒所有线程 
    
            breakBarrier();   
    
            2、生成下一代
    
            nextGeneration(); 
    
        } finally {
    
            lock.unlock();
    
        }
    
    }
    
    private void breakBarrier() {
    
        generation.broken = true;
    
        将等待线程数量重置
    
        count = parties;
    
        唤醒所有线程 
    
        trip.signalAll();
    
    }
    
    private void nextGeneration() {
    
        唤醒所有线程
    
        trip.signalAll();
    
        将等待线程数量重置
    
        count = parties;
    
        generation = new Generation();
    
    }
    

    相关文章

      网友评论

        本文标题:CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/ajztertx.html