CyclicBarrier

作者: 囧囧有神2号 | 来源:发表于2018-05-19 20:51 被阅读0次

    回环栅栏,通过它可以实现让一组线程在栅栏前等待,直到栅栏打开,再按AQS锁队列中的顺序依此执行,当然这里有插队情况。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。想想CountDownLatch,它是让一些线程等待另一些线程执行完再执行。
    先来看看用法:

    public class CyclicBarrierTest {
    
        static class Writer extends Thread {
            private CyclicBarrier barrier;
    
            public Writer(CyclicBarrier barrier) {
                this.barrier = barrier;
            }
    
            @Override
            public void run() {
                System.out.println("Thread name " + Thread.currentThread().getName() + " is writing");
                try {
                    Thread.sleep(5000);
                    System.out.println("Thread name " + Thread.currentThread().getName() + " write over");
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("All thread is done," +Thread.currentThread().getName() + " can start forward");
            }
        }
    
        public static void main(String[] args) {
            final int n = 4;
            final CyclicBarrier barrier = new CyclicBarrier(n,
                    () -> System.out.println("Choose " + Thread.currentThread().getName() +" thread to do this mission"));
            for (int i = 0; i < n; i++) {
                new Writer(barrier).start();
            }
        }
    }
    

    运行结果:

    Thread name Thread-0 is writing
    Thread name Thread-2 is writing
    Thread name Thread-1 is writing
    Thread name Thread-3 is writing
    Thread name Thread-3 write over
    Thread name Thread-0 write over
    Thread name Thread-1 write over
    Thread name Thread-2 write over
    Choose Thread-2 thread to do this mission
    All thread is done,Thread-2 can start forward
    All thread is done,Thread-3 can start forward
    All thread is done,Thread-1 can start forward
    All thread is done,Thread-0 can start forward
    

    所有线程都被栅栏挡住直到全部执行完(上面例子会挑一个线程执行run方法),再被一起释放;

    源码

    先来看看构造函数以及成员变量

        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
        private final int parties;
        private final Runnable barrierCommand;
        private Generation generation = new Generation();
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    

    三条线来讲源码:

    1. 调用await()
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    1. 调用await(long timeout, TimeUnit unit)
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
    1. 中断处理

    dowait

        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    dowait方法是核心,我们先来按照第一条线来讲,timed = false,nanos = 0L。
    初始设置有四个线程A,B,C,D,A线程执行完自己的业务逻辑,最后await——>dowait,执行到lock(这里的lock指的是ReentrentLock),BCD执行完业务由于lock阻塞就得等待被放入到锁队列(指由AQS维护的队列,有别于Condition的条件队列)中等待,接着A会进入for循环执行trip.await(),这里调用了Condition的await方法,之前文章介绍过它,此时A就会被放入到条件队列中等待,唤醒在锁队列中等待的线程。(假设锁队列中顺序是B,C,D)B就会被唤醒,执行A一样的步骤,最后被放入条件队列等待;C也一样;最后轮到D,index == 0 为true,执行nextGeneration

        private void nextGeneration() {
     // 调用了Condition的signalAll,之前分析过,将所有条件队列中的节点放回到锁队列,
    //并不会唤醒它们
            trip.signalAll();
            // 我们说CyclicBarrier是可重复的,实现就在这,重置count与generation
            count = parties;
            generation = new Generation();
        }
    

    之后D会执行lock.unlock(),锁队列中的线程将会依此被唤醒;
    (以上假设的执行顺序只是一种情况,实际中可能BCD并不会等待,可能A之后唤醒的不是B,因为ReentrentLock默认是非公平的....等等)
    第二条线:tined为true,nanos也有了值;
    还按照上面的情景,A会执行trip.awaitNanos

            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
    //创建当前线程节点,加入到条件队列
                Node node = addConditionWaiter();
    //唤醒锁队列中的下个节点
                int savedState = fullyRelease(node);
    //计算等待截止时间
                final long deadline = System.nanoTime() + nanosTimeout;
    //标识你的中断处理,是抛出还是重新恢复中断状态
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) { //检测是否在锁队列
                    if (nanosTimeout <= 0L) {
                        //节点状态设为0,放回到锁队列
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold) //1秒
                        LockSupport.parkNanos(this, nanosTimeout); //挂起
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
    //尝试更改同步状态,否则继续等待
    //(执行到这说明节点已经在锁队列中。情况一:被前一个节点唤醒后,跳出循环执行到这。
    //情况二:由于超时,线程不在被挂起,抢到执行资格后将自己放回到锁队列后,
    //尝试更改同步状态,即确认是否轮到自己执行了,存在**插队**可能)
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters(); //解除可达性,方便GC回收
                if (interruptMode != 0)
                    //确定中断策略,立即抛出或恢复中断标记//
                    reportInterruptAfterWait(interruptMode); 
                return deadline - System.nanoTime(); //返回差值,用于之后判断
            }
    

    (while循环条件是isOnSyncQueue,检测节点是否被放回锁队列,为什么要有这个检测?超时节点会执行transferAfterCancelledWait放回操作;未超时signal同样会将节点放回锁队列,待之后节点在锁队列中被唤醒,会跳出循环执行acquireQueued操作。)
    接着我们第二条线的逻辑,A被放入条件队列中阻塞,在锁队列中的BCD依此被唤醒,BC加入条件队列,最后D会唤醒它们;这是一种可能,现在我们假设,A超时了,A线程恢复就绪状态,若它抢到执行资格,代码执行transferAfterCancelledWait,它会将自己放回到锁队列中并尝试更改同步状态(即代码到了acquireQueued逻辑,失败会被挂起等待唤醒),也就是尝试插队,也就是说不在遵守CyclicBarrier规则,即它不用再等其他线程执行完了。在这里我们假设它更改同步状态成功,意味着独占,接下来返回时间差值,由于超时一定是小于0 的,回到dowait执行breakBarrier,
    (这里可能会有疑惑,方法不是有锁保护吗A怎么还能执行,注意不要和Synchronized内置锁搞混,JUC是利用同步状态的CAS更改操作来进行执行资格的收放从而确保线程安全,得不到资格的会被放入到队列中挂起,利用LockSupport)
    (超时的线程会恢复线程的就绪状态所以可能插队先一步执行)

        private void breakBarrier() {
            generation.broken = true; //标记broken为true,之后线程抛异常
            count = parties; //恢复初值
            trip.signalAll();  //将所有条件队列中的节点返回锁队列(注意并不唤醒线程)
        }
    

    总结一下就是,如果有一个线程超时了,他将会把自己放回到锁队列,无论他是否插队成功,总之轮到它执行(acquireQueued成功),他会将所有条件队列中的节点放回到锁队列,然后抛TimeoutException异常,那么之后新线程(指刚调动dowait的线程会先进入锁队列,它们不会进入条件队列,轮到它执行时会直接抛BrokenBarrierException异常),那么原本在条件队列中等待的那些线程,它们被放回到了锁队列等到轮到它们执行会根据nanos来判断是否抛TimeoutException异常。
    所以一旦有线程再条件队列中等待时超时,新线程抛出BrokenBarrierException异常代表CyclicBarrier已经被破坏,实际功能相当于一个ReentrentLock。
    那么问题来了?当你看到TimeoutException时代表什么呢?它仅仅只能代表任务执行超时了,我的意思是说条件队列中线程没有超时,最后一个线程也执行完了await逻辑,此时节点全在锁队列中,它们被依次唤醒,这个过程会消耗时间,造成TimeoutException。每种异常都向你传达了相应的的信息。---
    当你看到BrokenBarrierException,代表CyclicBarrier已被破坏,就应该依据你想实现的功能来更改你的等待时间。

    中断处理

    在dowait中:

                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    

    如果当前运行的线程在进入条件队列前,检查出自己被中断,那么breakBarrier被调用,条件队列中节点被放回到锁队列,自己抛InterruptedException异常,新线程(定义在上面)会抛BrokenBarrierException。
    breakBarrier方法的调用表明CyclicBarrier规则被破坏,实际变为一个ReentrentLock。
    ConditionObject里的awaitNanos方法,一开始处会检测中断;之后若等待期间被中断,会执行checkInterruptWhileWaiting

            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            
    

    意思是将节点加入锁队列,成功则将interruptMode 赋值为THROW_IE 代表要立即抛出,否则为REINTERRUPT代表恢复中断标记,不同的中断策略在reportInterruptAfterWait具体实现。
    回到dowait

                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) { //等待中被中断
    //g.broken为false,则调用breakBarrier,抛InterruptedException异常
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
    
                            //g.broken为true,只是恢复中断标记,
                            //接下来会抛出BrokenBarrierException
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
    

    前面分析过BrokenBarrierException代表CyclicBarrier被破坏,所以我想它的重要性要过与InterruptedException,所以上面当线程两个都有时就抛BrokenBarrierException异常。

    相关文章

      网友评论

        本文标题:CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/gccidftx.html