美文网首页Java 杂谈程序员我爱编程
[Java源码][并发J.U.C]---并发工具类CyclicB

[Java源码][并发J.U.C]---并发工具类CyclicB

作者: nicktming | 来源:发表于2018-09-11 09:00 被阅读33次

    前言

    CyclicBarrier要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行. 简单地说就是人到齐了后才可以让每个人继续去做自己的事情.

    CycliBarrier是通过ReentrantLockCondition实现的一个数据结构.

    本文代码: 代码下载

    例子1

    先通过一个简单的例子了解一下CyclicBarrier.

    package com.sourcecode.concurrencytools_CyclicBarrier;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest4 {
        static CyclicBarrier c = new CyclicBarrier(5);
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
            for (int i = 0; i < 5; i++) {
                Thread thread = new MyThread();
                thread.start();
            }
        }
    
        static class MyThread extends Thread {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " tries to wait!");
                    c.await();
                } catch (Exception e) {
                    System.out.println(e);
                    //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted());
                } finally {
                    System.out.println(Thread.currentThread().getName() + " finishes!");
                }
            }
        }
    }
    

    运行结果如下: 初始化CyclicBarrier的时候参数是5,表示需要等待5个线程达到后才可以打开屏障,正如结果所示,thread-0thread-3在等待,到最后一个线程thread-4到达屏障时,此时屏障打开,每个线程执行各自接下来的模块.

    如果初始化参数大于5,比如6,此程序将一直阻塞,因为没有第6个线程到达该屏障.

    Thread-0 tries to wait!
    Thread-1 tries to wait!
    Thread-2 tries to wait!
    Thread-3 tries to wait!
    Thread-4 tries to wait!
    Thread-4 finishes!
    Thread-1 finishes!
    Thread-0 finishes!
    Thread-2 finishes!
    Thread-3 finishes!
    

    实现思路分析

    cyclicbarrier.png
        private static class Generation {
          boolean broken = false;
        }
        /** 重入锁 */
        private final ReentrantLock lock = new ReentrantLock();
        /** 一个lock对象的Condition实例 */
        private final Condition trip = lock.newCondition();
        /** 拦截线程的总个数 */
        private final int parties;
        /** The command to run when tripped */
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
        /** 拦截线程的剩余需要数量 */
        private int count;
    

    从该图可以看出CyclicBarrier有一个重入锁的变量lock并且持有一个该锁的Condition实例trip,就可以大概知道该CyclicBarrier会让线程尝试获取锁并且在拿到锁后将屏障个数减减操作,然后根据count的数量来决定是否调用trip.await()操作,比如count==0表示最后一个到达屏障的线程,那么就不需要调用trip的方法了.

    构造方法

        public CyclicBarrier(int parties) {
          this(parties, null);
        }
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    

    第二个参数Runnable barrierAction表示的是当最后一个到达屏障的线程先执行完该barrierActionrun方法后再执行唤醒其他线程的操作.简单地说当到达屏障时,先执行barrierAction的业务再执行其他线程的业务.

    await方法

    await方法有两个,分别为await()await(long timeout, TimeUnit unit)方法,一个没有超时返回,另外一个有超时返回,但是两者都是调用dowait(boolean timed, long nanos),该方法是整个CyclicBarrier的核心实现.

    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    public int await(long timeout, TimeUnit unit)
                throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    

    所以接下来的看看该方法dowait是如何实现的.

    /**
         * @param timed 是否需要超时
         * @param nanos 时长
         * @return 返回还需要等待多少个线程才可以到达屏障
         * @throws InterruptedException 当前线程中断
         * @throws BrokenBarrierException 有其他线程中断或者其他线程超时
         * @throws TimeoutException 当前线程等待超时
         */
        private int dowait(boolean timed, long nanos)
                throws InterruptedException, BrokenBarrierException,
                TimeoutException {
            // 获取重入锁
            final ReentrantLock lock = this.lock;
            // 尝试获取锁
            lock.lock();
            try {
    
                //System.out.println(Thread.currentThread().getName() + " get locks.");
    
                // 获得当前代
                final Generation g = generation;
    
                // 如果有线程中断或者超时
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 如果当前线程被中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                //System.out.format("index=%d\n", index);
                if (index == 0) {  // 最后一个到达屏障的线程
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration(); //更新下一代
                        return 0;
                    } finally {
                        // 如果执行command.run发生异常,则breakBarrier
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        // 如果等待过程中有被线程中断
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    // 如果当代的broken为true,表明有线程被中断
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    // 如果换代了 表示可以返回了
                    if (g != generation)
                        return index;
    
                    // 如果超时则先break the current generation
                    // 再抛出超时异常
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                // 释放锁
                //System.out.println(Thread.currentThread().getName() + " release locks.");
                lock.unlock();
            }
        }
    /**
         *  break the current generation
         *  1. broken设置为true
         *  2. count 重新设置为parties
         *  3. 唤醒所有线程
         */
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        /**
         *  start a new generation
         *  1. 唤醒所有等待中的线程
         *  2. count 重新设置为parties
         *  3. generation 设置成一个新的Generation对象
         */
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
    

    该方法的流程大概如下:
    1. 尝试获取锁
    2. 如果不是最后一个到达屏障的线程,则进入for循环中一直等待(此时该线程会释放锁)直到被最后一个线程唤醒或者被某个线程中断后调用breakBarrier方法唤醒. 唤醒后需要竞争再次获得锁后才可以继续执行.
    3. 如果是最后一个到达屏障的线程,如果barrierCommand不为空,则需要先执行barrierCommand.run()方法,然后通过nextGeneration唤醒等待的线程.
    4. 在所有异常退出或者正常退出都需要释放锁.
    流程图如下

    dowait.png

    例子2

    设置线程屏障为3,启动两个线程2秒超时等待,让最后一个线程3秒后才到达屏障.

    package com.sourcecode.concurrencytools_CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    public class CyclicBarrierTest5 {
        static CyclicBarrier c = new CyclicBarrier(3);
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
            for (int i = 0; i < 2; i++) {
                Thread thread = new MyThread();
                thread.start();
            }
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + "------>" + "tries to wait!");
            c.await();
            System.out.println(Thread.currentThread().getName() + "------>" + "finishes!");
        }
    
        static class MyThread extends Thread {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " tries to wait!");
                    c.await(1, TimeUnit.SECONDS);
                    //c.await();
                } catch (Exception e) {
                    System.out.println(Thread.currentThread().getName() + "---->" + e);
                    //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted());
                } finally {
                    System.out.println(Thread.currentThread().getName() + " finishes!");
                }
            }
        }
    }
    

    结果如下: 可以看到第一个线程出现超时异常后,表示该线程已经调用了breakBarrier方法,所以可以看到后续的两个线程都是抛出BrokenBarrierException异常.

    Thread-0 tries to wait!
    Thread-1 tries to wait!
    Thread-1---->java.util.concurrent.TimeoutException
    Thread-0---->com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierException
    Thread-0 finishes!
    Thread-1 finishes!
    main------>tries to wait!
    Exception in thread "main" com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierException
        at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.dowait(CyclicBarrier.java:69)
        at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.await(CyclicBarrier.java:39)
        at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrierTest5.main(CyclicBarrierTest5.java:14)
    

    isBroken方法和reset方法

    /**
         * @return 当前代是否被破坏, 被破坏的两种情况, 某个线程中断或者等待超时
         */
        public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
    

    reset留作遇到好的例子后再分析

    参考

    1. Java并发编程的艺术

    相关文章

      网友评论

        本文标题:[Java源码][并发J.U.C]---并发工具类CyclicB

        本文链接:https://www.haomeiwen.com/subject/jmxjgftx.html