美文网首页
01—juc包下循环屏障CyclicBarrier解析

01—juc包下循环屏障CyclicBarrier解析

作者: Lesie_zwc | 来源:发表于2018-09-14 23:52 被阅读0次

    1、循坏屏障可以用一句话来总结:集齐七颗龙珠,方能召唤神龙

    /**
     *  CyclicBarrier
     *  的字面意思是可循环(Cyclic)使用屏障(Barrier)。它要做的事情是让
     *  一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,
     *  屏障才会开门,所有被屏障拦截的线程才会继续干活
     *
     *  线程进入屏障通过 CyclicBarrier 的 await()方法。
     */
    public class CyclicBarrierDemo {
    
        private static final int NUMBER = 7;
    
        public static void main(String[] args) {
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
                System.out.println("集齐七颗龙珠,方能召唤神龙");
            });
    
            for (int i = 1; i <= NUMBER; i++) {
                final int temp = i;
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName()+"\t 收集到第;"+temp+"\t 颗龙珠");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException|BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    

    2、循环屏障源码分析

    /**
     * desc:循坏屏障的源码分析
     *
     * @author Leslie
     */
    public class CyclicBarrier {
    
    
        /**
         * 每一个屏障代表一个Generation实例
         */
        private static class Generation {
            boolean broken = false;
        }
    
        /**
         * 可重用锁用于守护屏障
         */
        private final ReentrantLock lock = new ReentrantLock();
    
        /**
         * 守护屏障锁的通信用于通知屏障被推到
         */
        private final Condition trip = lock.newCondition();
        /**
         * 屏障的线程组成数量
         */
        private final int parties;
    
        /**
         * 屏障被推到要执行的任务
         */
        private final Runnable barrierCommand;
        /**
         * 当前屏障的 Generation 实例
         */
        private CyclicBarrier.Generation generation = new CyclicBarrier.Generation();
    
        /**
         * 屏障任然在等待的组成成员数,当成员数为0的时候,
         * 屏障被推到,开始执行屏障任务,同时会被重置成和初始parties值一样
         */
        private int count;
    
        /**
         * 更新状态,当屏障被推到的时候,同时唤醒所有的等待的线程,
         * 只有当持有锁的时候,才能被调用
         */
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new CyclicBarrier.Generation();
        }
    
        /**
         * 设置屏障被推到
         */
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        /**
         * 每一个线程执行完一个线程任务之后,就调用此方法,让屏障中的
         * count 数量减 1
         */
        private int dowait(boolean timed, long nanos)
                throws InterruptedException, BrokenBarrierException,
                TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final CyclicBarrier.Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        // 当所有的线程都执行完任务之后,就开始执行屏障推到之后的任务
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            //如果没有超时,就进行等待
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 屏障的构造器
         * @param parties 屏障的线程组成数量
         * @param barrierAction 屏障被推到时要执行的操作
         * @throws IllegalArgumentException 线程数量小于1 时抛出的异常
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        /**
         * 构造一个没有任何动作执行的一个屏障
         *
         * @param parties 屏障线程组成数量
         * @throws IllegalArgumentException 参数异常
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        /**
         * 获取屏障的线程组成数量
         *
         * @return 屏障的线程组成数量
         */
        public int getParties() {
            return parties;
        }
    
        /**
         * 屏障中的线程成员,执行完线程任务之后就调用此方法
         * 这样可以使屏障统计当前线程成员中多少线程执行完了任务
         */
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        /**
         * 调用时 屏障线程执行任务数量减少 1
         */
        public int await(long timeout, TimeUnit unit)
                throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        /**
         * 获取屏障的状态
         */
        public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 重置屏障中的状态
         */
        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 获取还有多少个线程还没有执行完其相应的任务
         */
        public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    
    }
    

    相应到此,你绝对掌握了CyclicBarrierl了。

    相关文章

      网友评论

          本文标题:01—juc包下循环屏障CyclicBarrier解析

          本文链接:https://www.haomeiwen.com/subject/dktzgftx.html