美文网首页
CyclicBarrier源码分析

CyclicBarrier源码分析

作者: 笔记本一号 | 来源:发表于2020-09-05 13:54 被阅读0次

不多BB开门见山
CyclicBarrier作用主要是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障踩会开门,所有被屏障拦截的线程才会继续运行

public class CyclicTest {
    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private static AtomicInteger count= new AtomicInteger();
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
        @Override
        public void run() {
            if (count.get()==0) {
                System.out.println("4个人到齐了,可以打麻将了");
            }else {
                System.out.println("大家还想打一局");
            }
        }
    });

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 8; i++) {
            Thread.sleep(100);
            executorService.submit(() -> {
                try {
                    if (count.get()==0) {
                        System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "来打麻将了"));
                        cyclicBarrier.await();
                        System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "开始摸牌"));
                        count.incrementAndGet();
                    }else {
                        System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "想在玩一局"));
                        cyclicBarrier.await();
                        System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "第二局开始摸牌"));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }
        System.out.println("这局结束后都累了,大家回去睡觉了");
    }

}
image.png

我传入cyclicBarrier的屏障点的数量是4所以必须得四个线程调用cyclicBarrier.await()才可以往下执行,每一个线程调用cyclicBarrier.await()后达到屏障点的数量就会减1,直到屏障点为0后先执行传入的Runnable对象然后才继续执行下面的代码,也就是四个线程达到了屏障才能执行下面的代码,当第一批的4个线程达到屏障后,第二批也需要4个线程达到,证明了cyclicBarrier的屏障是可以重复起效果的,线程数量每达到一次屏障点就会初始化为最初值

 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  public CyclicBarrier(int parties) {
        this(parties, null);
    }
   private static class Generation {
        boolean broken = false;
    }
 /*可重入锁*/
    private final ReentrantLock lock = new ReentrantLock();
    /*等待队列*/
    private final Condition trip = lock.newCondition();
    /*参与等待的线程数量*/
    private final int parties;
    /*当所有线程达到屏障点之后,首先执行的任务*/
    private final Runnable barrierCommand;
    /*实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减少一;
    当一次新的运算开始后,count会被重置为parties*/
    private int count;
    /*表示当前屏障的状态,true被破坏,false没有被破坏*/
    private Generation generation = new Generation();

核心方法await
该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
直到所有线程都到达屏障点,当前线程才会被唤醒

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

dowait()方法
timed:true表示当前线程进入timed_waiting状态,false表示当前线程进入waiting状态。
nanos:线程超时等待的时长
方法的主要步骤
1、对当前操作进行加锁
2、判断当前屏障是否被打破或者当前线程被中断,抛出异常。
3、若步骤二均未发生,判断当前线程是否是到达屏障点的最后一个线程,如果是,执行BarrierCommand(如果有),唤醒全部在Condition队列上等待的线程,并重置CyclicBarrier的状态,更新generation。
4、若当前线程不是最后一个到达屏障点的线程,那么当前线程被挂起,直至被唤醒/中断/超时。

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final CyclicBarrier.Generation g = generation;
        /*如果当前屏障被破坏,抛出异常*/
        if (g.broken)
            throw new BrokenBarrierException();
        
        /*如果当前调用await函数的线程被中断,调用breakBarrier函数并抛出异常*/
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        /*由于当前线程已经达到屏障点,那么等待到达屏障点的线程数量减少一*/
        int index = --count;
        /*index为0说明所有的线程都到达了屏障点*/
        if (index == 0) {  // tripped
            /*判断是否有优先执行的任务*/
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                /*有优先执行的任务,那么由当前调用await()的线程执行该任务*/
                if (command != null)
                    command.run();
                ranAction = true;
                /*执行唤醒全部线程的操作*/
                nextGeneration();
                return 0;
            } finally {
                /*说明在执行barrierCommand的时候发生了异常*/
                if (!ranAction)
                    breakBarrier();
            }
        }

        /*若所有的线程还未全部到达屏障点*/
        for (;;) {
            try {
                /*判断阻塞方式,并将当前线程加入Condition队列中*/
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                //若遭遇中断
                //这个中断异常是从Condition中抛出来的,对应THROW_IN
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    /*将当前异常直接抛出*/
                    throw ie;
                } else {
                    /*
                     *  这里两种情况:
                     *  1. g!=generation,说明被reset过
                     *  2. g==generation&&g.broken, 说明屏障被破坏了 
                     *  对于第二种情况,当前屏障已经被破坏,说明这次中断是属于这一轮等待到达屏障点的,接下来的操作会直接抛出异常(对应if(g.broken))
                     *  对于第一种情况,由于g!=generation, 说明reset了或者这一轮到达屏障点的任务已经完成,那么接下来的操作会直接退出循环(对应if(g!=generation)),所以这次中断是属于后续执行的操作,我们把中断外抛。
                    */      
                    Thread.currentThread().interrupt();
                }
            }
            /*判断屏障是否被破坏*/
            if (g.broken)
                throw new BrokenBarrierException();
            /*
             * 若当前线程被唤醒发现generation发生改变
             * 这里有两种情况:
             * 1. CyclicBarrier被reset
             * 2. 说明最后一个线程到达屏障点
             */
            if (g != generation)
                return index;
            /*当前线程等待超时*/
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

nextGeneration()主要做的就是唤醒等待的线程然后对屏障点进行初始化

private void nextGeneration() {
    /*唤醒Condition队列中的所有线程*/
    trip.signalAll();
    /*更新count为初始值*/
    count = parties;
    /*更新generator*/
    generation = new CyclicBarrier.Generation();
}

breakBarrier(): 唤醒所有Condition队列中的线程,并重置CyclicBarrier的成员变量count,并且把generation的broken变量设置为true,表示当前屏障被破坏。

private void breakBarrier() {
    /*表示当前屏障被破坏*/
    generation.broken = true;
    /*更新count为初始值*/
    count = parties;
    /*唤醒Condition队列中的所有线程*/
    trip.signalAll();
}
/*重置CyclicBarrier*/
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

/*判断屏障是否被破坏*/
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

相关文章

网友评论

      本文标题:CyclicBarrier源码分析

      本文链接:https://www.haomeiwen.com/subject/hvzxektx.html