@[TOC](高并发(9)- 线程并发工具类-CyclicBarrier)
前言
上篇文章讲解了线程的并发工具类之CountDownLatch,本文就来讲解CyclicBarrier并发工具类
CountDownLatch其实就是一组计数器,只有计数器为0,等待的线程才会执行。
CyclicBarrier则是类似于栅栏,只有全部的线程都完成了之后才会往下执行。
什么是CyclicBarrier
什么是CyclicBarrier,从字面来理解就是“循环的栅栏”,其实也就是一个可重复使用的栅栏。作用就是让所有的线程都执行完毕才会继续往下执行。指定栅栏的个数。
比如组装一台电脑,必须等所有的零件都到齐才可以组装完成并且开机。这就是CyclicBarrier的含义。
CyclicBarrier流程图
如图所示:我们定义了CyclicBarrier的数量为3,Ta、Tb、Tc三个线程就要都执行玩await()方法才可以继续往下走。
- Ta先执行完了任务,调用了await()方法,这时候Tb、Tc还没有执行完,所有Ta就必须在这里等待Tb、Tc。
- 随后Tb也执行完了,调用了await()方法,但是Tc还没有执行完,所有Ta和Tb要一起等待Tc。
- 最后Tc也执行完了,调用了await()方法,这时候三个栅栏都满足了,所以Ta、Tb、Tc都可以继续往下执行。
注意
CyclicBarrier有个barrierAction,作用是在线程达到栅栏时,会执行一次barrierAction方法,可以在barrierAction方法中添加我们需要的业务。
CyclicBarrier实现
构造方法:
//创建一个指定长度的栅栏。
public CyclicBarrier(int parties)
//创建一个指定长度的栅栏,并且在都达到栅栏的时候做一个任务
public CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier(int parties)构造方法是创建指定数量的栅栏,每个线程使用await()方法告诉CyclicBarrier我到了栅栏,然后当前线程被阻塞。
CyclicBarrier(int parties, Runnable barrierAction),在于所有线程到了栅栏处,执行barrierAction,方便处理更复杂的业务场景。
普通方法
//线程达到栅栏
public int await() throws InterruptedException, BrokenBarrierException
线程到达了栅栏出,调用await方法的线程告诉CyclicBarrier自己已经到达了栅栏,然后阻塞当前线程,知道所有参与线程都调用了await方法。
代码实现
/**
* @version 1.0
* @Description CyclicBarrierDemo
* @Author wb.yang
*/
public class CyclicBarrierDemo {
/**
* 创建栅栏数量为5,并且创建一个栅栏完成后需要执行的任务
*/
private static CyclicBarrier barrier = new CyclicBarrier(5, new CollectThread());
/**
* 存放子线程工作结果的容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
/**
* 负责屏障开放以后的工作,处理子线程中的数据
*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[").append(workResult.getValue()).append("]");
}
System.out.println(" 这个结果是 = " + result);
System.out.println("干自己的工作了.......");
}
}
/**
* 工作子线程生成一个随机数,放入工作容器中
*/
private static class SubThread implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId() + "", id);
Random r = new Random();
try {
if (r.nextBoolean()) {
Thread.sleep(2000 + id);
System.out.println("Thread_" + id + " ....做某些事情");
}
System.out.println(id + "....正在等待");
barrier.await();
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....开始工作了 ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i <= 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
}
从上图代码中看出了,我们先定义了一个数量为5的栅栏,然后又给他指定了一任务,在任务达到了栅栏处执行。然后用一个容器来存在每个子线程数据,然后在执行任务中做我们自己的处理。然后有一个工作子线程,其中先做一些操作,然后就开始await等待,等到所有线程都到了栅栏处,然后执行之后的业务操作。
运行结果图我们从运行结果中也可以看出,五个工作线程先后做了自己的业务操作,然后开始等待栅栏,等所有线程都到大了栅栏,就开始执行我们指定的任务,然后每个线程在执行自己剩余的业务操作。
网友评论