java源码 - CyclicBarrier

作者: 晴天哥_王志 | 来源:发表于2018-09-01 16:12 被阅读0次

    开篇

    • CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

    • CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

    • CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。

    CyclicBarrier用法demo

    public class Test {
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
            for(int i=0;i<N;i++)
                new Writer(barrier).start();
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
                try {
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    }
    
    线程Thread-0正在写入数据...
    线程Thread-3正在写入数据...
    线程Thread-2正在写入数据...
    线程Thread-1正在写入数据...
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    

    CyclicBarrier类定义

    • parties记录一共等待执行个数,count记录依然等待执行的个数。
    • barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
    • Generation的代的概念来实现CyclicBarrier的复用。
    • 构造函数负责初始化parties、count、barrierCommand的核心变量。
    public class CyclicBarrier {
        // 代的类定义
        private static class Generation {
            boolean broken = false;
        }
    
        // 内部通过ReentrantLock实现线程安全的等待
        private final ReentrantLock lock = new ReentrantLock();
        // 内部通过Lock的condition实现所有waiter的信号通知
        private final Condition trip = lock.newCondition();
        // 所有等待执行的个数
        private final int parties;
        // 所有等待线程都完成任务后由最后一个线程执行的命令
        private final Runnable barrierCommand;
       
        // 通过代的概念实现复用
        private Generation generation = new Generation();
    
        // 还在等待的个数
        private int count;
    
        // 核心构造函数
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    

    CyclicBarrier工作原理

    • CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
    • 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
    • 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
    • 线程的休眠和唤醒都是基于ReentrantLock来实现的
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            // 通过lock来保证线程安全
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
                // 判断generation过期的情况
                if (g.broken)
                    throw new BrokenBarrierException();
                // 判断线程中断情况
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                // 递减待执行的个数计数
                int index = --count;
                // 所有待执行任务完成后执行barrierCommand命令
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        // barrierCommand命令不为null的时候执行该命令
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        // 已经执行了barrierCommand
                        ranAction = true;
                        // 重置generation用以复用并且唤醒所有等待的线程
                        //       private void nextGeneration() {
                        //           trip.signalAll();
                        //           count = parties;
                        //           generation = new Generation();
                        //        }
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 如果count的值不为0,那么当前线程就开始进入等待
                // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
                for (;;) {
                    try {
                        if (!timed)
                            // private final Condition trip = lock.newCondition();
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
                    
                     // 各种后置处理逻辑
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    参考文章

    Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    相关文章

      网友评论

        本文标题:java源码 - CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/fggpwftx.html