美文网首页Java系统架构师
并发工具CyclicBarrier源码分析及应用

并发工具CyclicBarrier源码分析及应用

作者: 猿灯塔 | 来源:发表于2020-03-31 08:40 被阅读0次

    原创申明:本文由公众号【猿灯塔】原创,转载请说明出处标注

    今天呢!灯塔君跟大家讲:                      

    并发工具CyclicBarrier源码分析及应用

    一.CyclicBarrier简介

    1.简介

    CyclicBarrier是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。 CyclicBarrier(循环屏障) 直译为可循环使用(Cyclic)的屏障(Barrier)。它可以让一组线程到 达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才 会继续工作。

    JDK中的描述:

    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮 助。这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的

    2.运行机制

    二.CyclicBarrier结构图

    三.CyclicBarrier方法说明

    1.CyclicBarrier(parties)

    初始化相互等待的线程数量的构造方法 

    2.CyclicBarrier(parties,RunnablebarrierAction)

    初始化相互等待的线程数量的构造方法以及屏障线程的构造方法 屏障线程的运行时机:等待的线程数量 = parties,CyclicBarrier打开屏障之前 举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算

    3、getParties() 

    获取CyclicBarrier打开屏障的线程数量,也成为方数 

    4、getNumberWaiting() 

    获取真在CyclicBarrier上等待的线程数量

    5、await()

    在CyclicBarrier上进行阻塞等待,直到发生以下情形之一: 

    a.在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。 

    b.当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。 

    c.其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。 

    d.其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。 

    e.其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并 停止等待,继续执行。 

    6、await(timeout,TimeUnit)

    在CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一: 

    a.在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。 

    b.当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。 

    c.当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。 

    d.其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。 

    e.其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。 

    f.其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并 停止等待,继续执行。 

    7、isBroken

    获取是否破损标志位broken的值,此值有以下几种情况: 

    a.CyclicBarrier初始化时,broken=false,表示屏障未破损。 

    b.如果正在等待的线程被中断,则broken=true,表示屏障破损。 

    c.如果正在等待的线程超时,则broken=true,表示屏障破损。 

    d.如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。 

    8、reset 

    使得CyclicBarrier回归初始状态,直观来看它做了两件事: a.如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。 b.将是否破损标志位broken置为false。

    四.源码分析

    首先看一下CyclicBarrier内部声明的一些属性 

    /**用于保护屏障入口的锁*/

    private final ReentrantLock lock = new 

    ReentrantLock(); 

    /**线程等待条件 */ 

    private final Condition trip = lock.newCondition();

     /** 记录等待的线程数 */ 

    private final int parties; 

    /**所有线程到达屏障点后,首先执行的命令 

    */ private final Runnable barrierCommand; 

    private Generation generation = new Generation(); 

    /**实际中仍在等待的线程数,每当有一个线程到达屏障点,

    count值就会减一;当一次新的运算开始后, 

    count的值被重置为parties*/ 

    private int count;

    其中,Generation是CyclicBarrier的一个静态内部类

    它只有一个boolean类型的属性,具体代码如下:

    private static class Generation { Generation() {} // prevent access constructor creation boolean broken; // initially false }

    当使用构造方法创建CyclicBarrier实例的时候

    就是给上面这些属性赋值

    //创建一个CyclicBarrier实例,parties指定参与相互等待的线程数, 

    //barrierAction指定当所有线程到达屏障点之后,

    首先执行的操作,该操作由最后一个进入屏障点线程执行. 

    public CyclicBarrier(int parties, 

    Runnable barrierAction) { 

    if (parties <= 0) throw new IllegalArgumentException(); 

    this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

    //创建一个CyclicBarrier实例,parties指定

     参与相互等待的线程数 

    public CyclicBarrier(int parties) { this(parties, null); }

    CyclicBarrier.await方法调用CyclicBarrier.dowait()

    每次调用await()都会使计数器-1,当减少到0 时就会

    唤醒所有的线程 ,当调用await()方法时,当前线程已经

    到达屏障点,当前线程阻塞进入休眠状态

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 

    //直到所有线程都到达屏障点,当前线程才会被唤醒 

    public int await() throws InterruptedException, BrokenBarrierException { try {return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }

    当前线程已经到达屏障点,当前线程阻塞进入休眠状态

    //该方法被调用时表示当前线程已经到达屏障点.当前   线程阻塞进入休眠状态 

    //在timeout指定的超时时间内,等待其他参与线程

      到达屏障点 

    //如果超出指定的等待时间则抛TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待 

    public int await(long timeout, TimeUnit unit) 

    throws InterruptedException, BrokenBarrierException, 

    TimeoutException { return dowait(true, unit.toNanos(timeout)); }

    dowait()方法

    private int dowait(boolean timed, long nanos) 

    throws InterruptedException, BrokenBarrierException, TimeoutException 

    { //使用独占资源锁控制多线程并发进入这段代码

    final ReentrantLock lock = this.lock; 

    //独占锁控制线程并发访问 

    lock.lock(); 

    try {final Generation g = generation; 

    if (g.broken) throw new BrokenBarrierException(); 

    //检查当前线程是否被中断 

    if (Thread.interrupted()) { 

    //如果当前线程被中断会做以下三件事 

    //1.打翻当前栅栏 

    //2.唤醒拦截的所有线程 

    //3.抛出中断异常 

    breakBarrier(); 

    throw new InterruptedException(); }

    //每调用一次await()方法,计数器就减一 

    int index = --count; 

    //计数器的值减为0则需唤醒所有线程并转换到下一代 

    if (index == 0) { // tripped boolean ranAction = false; 

    try {

    //如果在创建CyclicBarrier实例时设置了barrierAction,则先执行 

    barrierAction inal Runnable command = barrierCommand;

     if (command != null) 

         command.run(); 

         ranAction = true; 

    //当所有参与的线程都到达屏障点,为唤醒所有处于

      休眠状态的线程做准备工作 

    //需要注意的是,唤醒所有阻塞线程不是在这里 nextGeneration(); 

    return 0; 

    } finally { 

    //确保在任务未成功执行时能将所有线程唤醒 

    if (!ranAction)

     breakBarrier(); 

    }

    // loop until tripped, broken, 

    interrupted, or timed out 

    //如果计数器不为0则执行此循环

    for (;;) 

    { try {

    //根据传入的参数来决定是定时等待还是非定时等待

    if (!timed) 

    //让当前执行的线程阻塞,处于休眠状态 trip.await(); 

    else if (nanos > 0L) 

    //让当前执行的线程阻塞,在超时时间内处于休眠状态 

    nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { 

    //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程

    if (g == generation && ! g.broken) { breakBarrier(); 

    throw ie; } 

    else { 

    // We're about to finish waiting even 

       if we had not

    // been interrupted, so this interrupt 

       is deemed to 

    // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }

    //如果线程因为打翻栅栏操作而被唤醒则抛出异常 

    if (g.broken) throw new BrokenBarrierException(); 

    if (g != generation) return index; 

    //如果线程因为时间到了而被唤醒则打翻栅栏

      并抛出异常 

    if (timed && nanos <= 0L) { 

    breakBarrier(); 

    throw new TimeoutException(); } 

    finally { lock.unlock(); } }

    每次调用await方法都会使内部的计数器临时变量-1

    当减少到0时,就会调用nextGeneration方法

    private void nextGeneration() { 

    // signal completion of last generation trip.signalAll(); 

    // set up next generation 

    count = parties; generation = new Generation(); }

    在这里唤醒所有阻塞的线程 

    提醒:在声明CyclicBarrier的时候还可以传一个Runnable的实现类,当计数器减少到0时,

    会执行该 实现类 到这里CyclicBarrier的

    实现原理基本已经都清楚了下面来深入源码分析

    一下线程阻塞代码 trip.await()和线程唤醒trip.signalAll()的实现。

    //await()是AQS内部类ConditionObject中的方法 public final void await() throws InterruptedException { 

    //如果线程中断抛异常 

    if (Thread.interrupted()) 

       throw new InterruptedException(); 

    //新建Node节点,并将新节点加入到Condition

      等待队列中 

    //Condition等待队列是AQS内部类

    ConditionObject实现的,ConditionObject

    有两个属性,

    分别是firstWaiter和lastWaiter

    都是Node类型 

    //firstWaiter和lastWaiter分别用于代表Condition等待队列的头结点和尾节点 Node node = addConditionWaiter(); 

    //释放独占锁,让其它线程可以获取到dowait()

      方法中的独占锁 

    int savedState = fullyRelease(node); 

    int interruptMode = 0; 

    //检测此节点是否在资源等待队列(AQS同步队列)中, //如果不在,说明此线程还没有竞争资源锁的权利,

    此线程继续阻塞,直到检测到此节点在 资源等待队列上(AQS同步队列)中

    //这里出现了两个等待队列,分别是Condition等待

      队列和AQS资源锁等待队列(或者说是 同步队列) //Condition等待队列是等待被唤醒的线程队列

     AQS资源锁等待队列是等待获取资源锁 的队列 

    while (!isOnSyncQueue(node)) 

    //阻塞当前线程,当前线程进入休眠状态,可以看到

      这里使用LockSupport.park阻 塞当前线程 LockSupport.park(this); 

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode 

    = REINTERRUPT; 

    if (node.nextWaiter != null) 

    // clean up if cancelled unlinkCancelledWaiters(); 

    if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

    //addConditionWaiter()是AQS内部类ConditionObject中的方法 

    private Node addConditionWaiter() 

    { Node t = lastWaiter; 

    // 将condition等待队列中,节点状态不是

    CONDITION的节点,从condition等待队列中移除

    if (t != null && t.waitStatus != Node.CONDITION) 

    { unlinkCancelledWaiters(); 

    t = lastWaiter; }

    //以下操作是用此线程构造一个节点,并将之加入到condition等待队列尾部 

    Node node = new Node(Thread.currentThread(), Node.CONDITION); 

    if (t == null) 

    firstWaiter = node; 

    else

    t.nextWaiter = node; 

    lastWaiter = node; return node; }

    //signalAll是AQS内部类ConditionObject中的方法

    public final void signalAll() {

     if (!isHeldExclusively()) 

    throw new IllegalMonitorStateException(); //Condition等待队列的头结点

    Node first = firstWaiter; 

    if (first != null) doSignalAll(first); }

    private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do {Node next = first.nextWaiter; first.nextWaiter = null;

    //将Condition等待队列中的Node节点按之前顺序

    都转移到了AQS同步队列中 transferForSignal(first); 

    first = next; 

    } while (first != null); }

    final boolean transferForSignal

    (Node node) { 

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 

    return false; 

    //这里将Condition等待队列中的Node节点插入到AQS同步队列的尾部 

    Node p = enq(node); 

    int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

    //ReentrantLock#unlock()方法 

    public void unlock() { 

    //Sync是ReentrantLock的内部类,继承自AbstractQueuedSynchronizer,

    它是 ReentrantLock中公平锁和非公平锁的基础实现 sync.release(1); }

    public final boolean release(int arg) { 

    //释放锁 

    if (tryRelease(arg)) { 

    //AQS同步队列头结点 

    Node h = head; if (h != null && h.waitStatus != 0) 

    //唤醒节点中的线程 

    unparkSuccessor(h); return true; }

    return false; }

    private void unparkSuccessor(Node node) 

    { int ws = node.waitStatus; 

    if (ws < 0) compareAndSetWaitStatus(node, ws, 0); 

    Node s = node.next; 

    if (s == null || s.waitStatus > 0) 

    { s = null; for (Node t = tail; 

    t != null && t != node; t = t.prev) 

    if (t.waitStatus <= 0) s = t; }

    if (s != null) //唤醒阻塞线程 LockSupport.unpark(s.thread); }

    365天干货不断,可以微信搜索「猿灯塔」第一时间阅读,回复【资料】【面试】【简历】有我准备的一线大厂面试资料和简历模板

    相关文章

      网友评论

        本文标题:并发工具CyclicBarrier源码分析及应用

        本文链接:https://www.haomeiwen.com/subject/qzkvuhtx.html