1. 思路分治
- 将一份复杂逻辑拆分成一个个简单逻辑并发执行,进行汇总,得到最终结果。MapReduce就是这种思路
- 伪代码
if(任务很小){
直接计算得到结果
}else{
分拆成N个子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
2. API
![](https://img.haomeiwen.com/i9547570/42c867b99b6a7cd4.png)
image.png
- ForkJoinTask 常用子类
- RecursiveAction 无返回值的任务
- RecursiveTask 有返回值的任务
- CountedCompleter 完成任务后将触发其他任务
- ForkJoinPool ForkJoinTask 需要通过 ForkJoinPool 来执行
- execute 异步,无返回结果
- submit 异步,有返回结果 (返回Future<T>)
- invoke 同步,有返回结果 (会阻塞)
3.demo
@Slf4j
public class InsertDataTask extends RecursiveTask<Integer> {
private GeneralService generalService;
private List<String> list;
private static int MAX = 5;
private int start;
private int end;
public InsertDataTask(int start, int end,List<String> list,GeneralService generalService) {
this.list = list;
this.start = start;
this.end = end;
this.generalService = generalService;
}
@Override
protected Integer compute() {
int sum = 0;
if(end - start < MAX ){
for(int i = start; i <= end; i ++){
sum ++;
generalService.excuse(list.get(i));
}
return sum;
} else {
int middle = (start + end) / 2;
InsertDataTask left = new InsertDataTask(start,middle,list, generalService);
InsertDataTask right = new InsertDataTask(middle + 1,end,list, generalService);
left.fork();
right.fork();
return left.join() + right.join();
}
}
public static void main(String[] strs){
List<Map<String, Object>> dataList = dataMap.getValue();
// 5000组以上数据分治才有意义
if(dataList.size() > 5){
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> future =forkJoinPool.submit(new InsertDataTask(0, insertSqlList.size() - 1,insertSqlList,generalService));
Integer total = future.get();
long end = System.currentTimeMillis();
log.debug("子表插入时间:"+(end-start) / 1000 + " s");
} else {
// 正常执行
}
}
}
网友评论