一、前言
前段时间去斗鱼面试被刷一直耿耿于怀,没办法自己太过菜鸡 ( Ĭ ^ Ĭ )。记得当时面试官问我“假如有三个询价接口,需要获取到最低报价?”吐露个心声,我毕业至今工作中没什么机会用到多线程,处理并发问题,虽然有时间就会看看这个java进阶模块。然后,就答了个“Callable接口,主线程阻塞(get)”什么的。结果可想而知,“哎,下一个!”(。•́︿•̀。)
最近提及这个话题,心一横一定要弄明白。然后CyclicBarrier就这样出现在我的视野里...
二、CyclicBarrier
CyclicBarrier意为“可循环的屏障”。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障解除,所有被阻塞的线程才会继续。CyclicBarrier默认的构造方法是CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用它的await()
方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
另外还有一个构造方法CyclicBarrier(int parties, Runnable barrierAction)
,第二个参数barrierAction,是一个Runnable接口,相当于一个回调方法,是在最后一个线程到达屏障时触发。这个很关键,我们很容易用这个方法处理多线程结果合并计算的问题。
这个类和CountDownLatch类似,但是CyclicBarrier计数可以重置,意为循环使用( ̄▽ ̄)~*。但这不是本文研究的重点。接下来还是上干货,实践是检验真理的唯一标准。
三、编码实践
在了解了CyclicBarrier后,可以构思怎么实现这个需求了。
- 新建一个CyclicBarrier共享变量,还需要一个询价结果list,这个放在barrierAction里面计算用;
- 编写询价线程Runnable,在
run()
模拟网络耗时,随机生成报价,并且将询价结果result添加到共享变量list中,注意需要加锁; - 主线程创建一个线程池,模拟三个线程操作,编写barrierAction里面的计算代码;
1、询价结果
/**
* 询价结果
*/
class Result {
private String site;
private int price;
public String getSite() {
return site;
}
public void setSite(String site) {
this.site = site;
}
int getPrice() {
return price;
}
void setPrice(int price) {
this.price = price;
}
@Override
public String toString() {
return "Result{" +
"site='" + site + '\'' +
", price=" + price +
'}';
}
}
2、询价任务线程
/**
* 询价接口
*/
class AskTask implements Runnable {
private CyclicBarrier barrier;
private final List<Result> results;
AskTask(CyclicBarrier barrier, List<Result> results) {
this.barrier = barrier;
this.results = results;
}
@Override
public void run() {
Result result = null;
try {
// 模拟网络耗时获取数据
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 3000));
result = new Result();
result.setPrice((new Random().nextInt(1000) + 500));
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (results) {
if (result != null){
/* 模拟报价站点名称 */
String[] sites = {"苏宁", "天猫", "京东"};
result.setSite(sites[results.size()]);
// 添加到共享变量list中
results.add(result);
}
}
try {
System.out.println("询价结束,等待其它线程..." + result);
// 抵达屏障
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
3、main方法
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
final List<Result> results = new ArrayList<>();
CyclicBarrier barrier = new CyclicBarrier(3,
new Runnable() {
@Override
public void run() {
if (!results.isEmpty()){
Result min = results.get(0);
for (Result result : results){
if (result.getPrice() < min.getPrice()) min = result;
}
System.out.println("最低报价是:" + min);
}
}
});
// 模拟三个询价接口,采用线程池管理
for (int i = 0; i < 3; i++) {
AskTask c = new AskTask(barrier, results);
service.submit(c);
}
service.shutdown();
}
4、测试
我们运行下程序:
询价结束,等待其它线程...Result{site='苏宁', price=823}
询价结束,等待其它线程...Result{site='天猫', price=1284}
询价结束,等待其它线程...Result{site='京东', price=1221}
最低报价是:Result{site='苏宁', price=823}
可以运行多次,因为price是随机生成的。可以观察主线程阻塞情况,总耗时,佐证我们程序的效率。
网友评论