美文网首页
AQS源码浅析(10)——CyclicBarrier

AQS源码浅析(10)——CyclicBarrier

作者: 墨_0b54 | 来源:发表于2022-04-27 16:52 被阅读0次

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会【开门】,所有被屏障拦截的线程才会继续干活。

    CycliBarrier 是基于 ReentrantLock实现的,看看它的构造器和属性。

    private static class Generation {
        boolean broken = false;
    }
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition(); //等待开门的线程队列
    /** The number of parties */
    private final int parties;//在触发屏障之前必须调用await的线程数
    /* The command to run when tripped */
    private final Runnable barrierCommand;//当屏障被触发时执行的命令
    /** The current generation */
    private Generation generation = new Generation(); //当前代
    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;//仍在等待的线程数量。每个generation从parties倒计时到0。它会在创建新的generation或generation损坏时重置为parties。
    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    

    可以看到一个Generation内部类,因为barrier是可以循环使用的,generation代表barrier的一次使用,暂且称为代。每当barrier开门或重置时,generation就会发生变化。

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();//上一代完成,唤醒所有上一代
        // set up next generation
        count = parties;//重置count
        generation = new Generation();
    }
    

    内部的broken代表是否被破坏,ture代表barrier已损坏。

    //将当前barrier的generation设置为已损坏并唤醒所有当代线程。仅在持有锁时调用。
    private void breakBarrier() {
        generation.broken = true;
        count = parties;//重置count
        trip.signalAll();//唤醒所有trip中的线程
    }
    

    barrier的await方法可中断,当线程被中断且没有达到开门条件,会破坏并重置Barrier;其他线程调用reset方法重置barrier时也会破坏并重置Barrier;当线程发现barrier已经被破坏就会抛BrokenBarrierException异常。
    当最后一个线程到达时,barrier会开门(唤醒所有线程)并进入下一代,await返回线程完成的顺序,值越大代表越先完成。
    核心方法主要实现在dowait方法:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)//如果当前barrier已损坏,抛BrokenBarrierException
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {//调用该方法的线程有中断标记,破坏barrier后中断
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                if (index == 0) {  // 等于0时代表所有线程执行完毕,开门
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {//阻塞直到开门、损坏、中断或超时
                    try {
                        if (!timed)
                            trip.await(); //直接阻塞
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {//barrier开门或者reset几乎同时线程中断,if条件不满足,就认为barrier开门或者reset行为先完成
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)//调用了reset
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;//线程到达的索引顺序
    
                    if (timed && nanos <= 0L) {//超时
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    重置barrier的方法:

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // 破坏barrier
            nextGeneration(); // 重置
        } finally {
            lock.unlock();
        }
    }
    

    相关文章

      网友评论

          本文标题:AQS源码浅析(10)——CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/npqssrtx.html