美文网首页
同步屏障 - CyclicBarrier

同步屏障 - CyclicBarrier

作者: zbsong | 来源:发表于2020-07-06 19:17 被阅读0次

CyclicBarrier 是什么?

让一组线程到达一个屏障后被阻塞,直到最后一个线程到达屏障时,屏障才会“开门”,所有被屏障阻塞的线程继续执行。

CyclicBarrier 构造

CyclicBarrier(int parties)parties表示屏障拦截的线程数量,当线程调用await()方法就是告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier(int parties,Runnable barrierAction)
用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

CyclicBarrier 方法

类型 方法 描述
int await() 等待直到parties个线程都调用了await()
int await(long timeout, TimeUnit unit) 等待直到parties个线程都调用了await() 或者过了超时时间
int getNumberWaiting() 获取CyclicBarrier当前在等待的线程数量
int getParties() 获取CyclicBarrier拦截线程数量
boolean isBroken() 获取阻塞的线程是否被中断
int reset() 将屏障设置为初始状态

CyclicBarrier(int parties)例子

public class CyclicBarrierTest {
    //定义一个屏障,设置拦截两个线程
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //通知CyclicBarrier我已经到达屏障,线程阻塞
                    cyclicBarrier.await();
                    System.out.println("1");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("2");
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //通知CyclicBarrier我已经到达屏障,线程阻塞
                    cyclicBarrier.await();
                    System.out.println("3");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出结果

2
3
1

因为我们定义的CyclicBarrier是拦截两个线程,所以第一个线程执行了await()后开始阻塞,然后继续执行,输出“2”,当第二个线程执行了await()方法后,第二个线程达到屏障,屏障打开“大门”,两个线程继续执行,输出“3”和“1”,也有一种可能是先输出“1”然后输出“3”,这是因为主线程和子线程的调度是由CPU决定的,都有可能先执行。

CyclicBarrier(int parties,Runnable barrierAction)例子

public class CyclicBarrierTest2 {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                    System.out.println("1");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("2");
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                    System.out.println("3");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    static class A implements Runnable {
        @Override
        public void run() {
            System.out.println("4");
        }
    }
}

结果

2
4
3
1

根据上面的例子稍微进行改造,当第二个线程达到屏障后,优先执行了A,然后阻塞线程才继续执行。

CyclicBarrier应用场景

可以用于多线程计算数据,最后合并计算结果的场景。

例子

创建一个屏障,设置拦截线程数为10
假设每个线程计算结果返回1
最终十个线程计算结果相加得到的结果应该为10

package com.sy.thread.example;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * Description: thread
 * @author songyu
 */
public class CyclicBarrierTest3 implements Runnable{
    /**
     * 创建屏障
     */
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(10,this::run);
    /**
     * 保存每个线程执行的结果
     */
    private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>();
    private void calculationData() {
        //使用线程不规范,实际使用时可以使用ThreadPoolExecutor
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    concurrentHashMap.put(Thread.currentThread().getName(),1);
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    @Override
    public void run() {
        int result = 0;
        for (Map.Entry<String, Integer> map : concurrentHashMap.entrySet()) {
            result = result + map.getValue();
        }
        System.out.println("最终计算结果:" + result);
    }

    public static void main(String[] args) {
        CyclicBarrierTest3 c = new CyclicBarrierTest3();
        c.calculationData();
    }

}

结果

最终计算结果:10

相关文章

网友评论

      本文标题:同步屏障 - CyclicBarrier

      本文链接:https://www.haomeiwen.com/subject/fcwcqktx.html