测试打开目录下1000+文件,使用两种方式,返回处理的行数:
- 使用JDK7 的forkjoin框架
- 使用Java8 的并行流(内部基于forkjoin)
code
- RecursiveTask实现类,Forkjoin框架的Task类型,是递归任务
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ReadFileTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private File[] files;
private final static Integer THRESHOLD = 20;
public ReadFileTask(File[] files) {
this.files = files;
}
@Override
protected Integer compute() {
int all = 0;
int size = files.length;
// 采用二叉树思想来拆分子任务
if (size > THRESHOLD) {
int mid = size / 2;
ForkJoinTask<Integer> left = new ReadFileTask(Arrays.copyOfRange(files, 0, mid)).fork();
ForkJoinTask<Integer> right = new ReadFileTask(Arrays.copyOfRange(files, mid, size)).fork();
// 等待子任务的结束以统计数据
left.join();
right.join();
try {
all = left.get() + right.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
} else {
// 任务详细逻辑
for (File f : files) {
try (BufferedReader reader = Files.newBufferedReader(f.toPath())) {
String line = null;
while ((line = reader.readLine()) != null) {
// processLine(line);
all++;
}
} catch (IOException e) {
}
}
}
return all;
}
// getter setter
}
- Client 包含并行流方法
public class Client {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Path path = Paths.get("C:\\Users\\Desktop\\all\\");
useForkJoinPool(path);
useParallelStream(path);
}
static void useForkJoinPool(Path path) throws InterruptedException, ExecutionException {
long begin = System.currentTimeMillis();
if (Files.exists(path) && Files.isDirectory(path)) {
File[] list = path.toFile().listFiles();
System.out.println(list.length);
ReadFileTask task = new ReadFileTask(list);
// 未指定pool内线程数,使用CPU core的数目
ForkJoinPool fjp = new ForkJoinPool();
ForkJoinTask<Integer> future = fjp.submit(task);
System.out.println(future.get());
}
System.out.println("used:" + (System.currentTimeMillis() - begin) + "mills");
}
// java8 并行流
static void useParallelStream(Path path) throws IOException {
long begin = System.currentTimeMillis();
if (Files.exists(path) && Files.isDirectory(path)) {
int[] count = { 0, 0 };
@SuppressWarnings("resource")
Stream<Path> files = Files.list(path);
files.parallel().forEach(p -> {
count[0]++;
try (BufferedReader reader = Files.newBufferedReader(p)) {
String line = null;
while ((line = reader.readLine()) != null) {
count[1]++;
// processLine(line);
}
} catch (IOException e) {
}
});
System.out.println("files count:" + count[0]);
System.out.println("line count:" + count[1]);
}
System.out.println("used:" + (System.currentTimeMillis() - begin) + "mills");
}
}
- 结果
// 结果
// 1053
// 6320
// used:203mills
// files count:1053
// line count:6320
// used:234mills
网友评论