ForkJoinpPool大任务拆解操作记录
1概述
forkjoin执行任务的核心在于compute()方法,先判断任务是否足够小,如果足够小就直接计算。否则,把自身的任务一拆位二,分别计算。
1.1编写任务处理逻辑
package com.tech.ability.myjacob;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.RecursiveTask;
/**
-
@author kikop
-
@version 1.0
-
@project Name: TechnicalAbilityToolBox
-
@file Name: MySumTask
-
@desc 功能描述
-
@date 2019/7/8
-
@time 22:39
-
@by IDE: IntelliJ IDEA
*/
public class MySumTaskLong extends RecursiveTask<Long> {static final int THRESHOLD = 100;
long[] array;
int start;
int end;
String strFileNamePrefix = "D:/myjacobtest/mysumtask_";MySumTaskLong(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
File file = new File(String.format("%s%03d.doc", strFileNamePrefix, start));
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
FileWriter fileWriter = new FileWriter(file);
for (int i = start; i < end; i++) {
fileWriter.write(i + " ");
sum += array[i];
}
fileWriter.write("\r\n任务执行结果:"+sum + "!");
fileWriter.flush();
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
System.out.println(String.format("compute %d~%d = %d", start, end, sum));
} catch (IOException e) {
e.printStackTrace();
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
MySumTaskLong subtask1 = new MySumTaskLong(this.array, start, middle);
MySumTaskLong subtask2 = new MySumTaskLong(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
1.2测试
package com.tech.ability.myjacob;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
-
@author kikop
-
@version 1.0
-
@project Name: TechnicalAbilityToolBox
-
@file Name: MySumTaskTest
-
@desc 功能描述
-
@date 2019/7/8
-
@time 22:39
-
@by IDE: IntelliJ IDEA
-
参考:https://www.liaoxuefeng.com/article/1146802219354112
*/
public class MySumTaskLongTest {private static void fillRandom(long[] array) {
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
}public static void main(String[] args) {
// 创建随机数组成的数组: long[] array = new long[400]; fillRandom(array); // fork/join task: ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4 ForkJoinTask<Long> task = new MySumTaskLong(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = fjp.invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
}
1.3 结果
split 0~400 ==> 0~200, 200~400
split 0~200 ==> 0~100, 100~200
split 200~400 ==> 200~300, 300~400
compute 0~100 = 4950
compute 200~300 = 24950
compute 100~200 = 14950
result = 4950 + 14950 ==> 19900
compute 300~400 = 34950
result = 24950 + 34950 ==> 59900
result = 19900 + 59900 ==> 79800
Fork/join sum: 79800 in 268 ms.
2 使用注意点
New Task1
New Task2
InvokeAll(task1,task2)
Task1.join()
Task2.join()
网友评论