CyclicBarrier和CountDownLatch
CyclicBarrier :字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
下面的代码是开启五个线程去生成数据,等生成数据结束再启动一个线程去汇总生成的数据
//五个线程去生成数据,等三个线程生成数据完成再启动一个线程去汇总结果
public class CyclicBarrierDemo {
private static ConcurrentHashMap<String,Integer> map=new ConcurrentHashMap<String,Integer>();
static class Producer implements Runnable{
private CyclicBarrier cyclicBarrier;
public Producer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
Random random=new Random();
int num=random.nextInt(1000)+1000;
System.out.println(Thread.currentThread().getId()+"product "+num);
map.put(Thread.currentThread().getId( )+"",num);
try {
Thread.sleep(1000);
//生成数据后会在这里等待(阻塞),直到所有的线程都生成数据结束再放行
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
private static class Consumer implements Runnable{
@Override
public void run() {
int sum=0;
for (Map.Entry<String,Integer> entry:map.entrySet()){
sum+=entry.getValue();
}
System.out.println("consumer caculate sum ="+sum);
}
}
public static void main(String[] args) {
//这个构造方法的意思是等五个线程都到达屏障之后,开始执行Consumer线程的run方法
CyclicBarrier cyclicBarrier=new CyclicBarrier(5,new Consumer());
Producer producer=new Producer(cyclicBarrier);
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(producer);
thread.start();
}
}
}
执行结果
11product 1890
12product 1849
13product 1314
14product 1346
15product 1658
consumer caculate sum =8057
CountDownLatch 允许一个或多个线程等待其他线程完成操作。CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。
上面的场景同样可以使用CountDownLatch来实现:
public class CountDownLatchDemo {
static class Producer implements Runnable{
private CountDownLatch countDownLatch;
private ConcurrentHashMap<String,Integer> map;
public Producer(CountDownLatch countDownLatch,ConcurrentHashMap<String,Integer> map) {
this.countDownLatch = countDownLatch;
this.map=map;
}
@Override
public void run() {
Random random=new Random();
int num=random.nextInt(1000)+1000;
System.out.println(Thread.currentThread().getId()+"product "+num);
map.put(Thread.currentThread().getId( )+"",num);
//生产完了就去减1
countDownLatch.countDown();
}
}
static class Consumer implements Runnable{
private ConcurrentHashMap<String,Integer> map;
private CountDownLatch countDownLatch;
public Consumer(ConcurrentHashMap<String, Integer> map, CountDownLatch countDownLatch) {
this.map = map;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//消费者一直阻塞,直到生产者生产完毕,把countDownLatch的count减到0
countDownLatch.await();
int sum=0;
for (Map.Entry<String,Integer> entry:map.entrySet()){
sum+=entry.getValue();
}
System.out.println("consumer calculate sum ="+sum );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int producorThreadSize=5;
CountDownLatch countDownLatch=new CountDownLatch(producorThreadSize);
ConcurrentHashMap map=new ConcurrentHashMap();
Producer producer=new Producer(countDownLatch,map);
for (int i = 0; i < producorThreadSize; i++) {
new Thread(producer).start();
}
new Thread(new Consumer(map,countDownLatch)).start();
}
}
执行结果:
13product 1978
15product 1752
14product 1449
12product 1373
11product 1318
consumer calculate sum =7870
CyclicBarrier和CountDownLatch的区别:
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,CountDownLatch.await一般阻塞主线程,所有的工作线程执行countDown,而CyclicBarrierton通过工作线程调用await从而阻塞工作线程,直到所有工作线程达到屏障。
网友评论