美文网首页
Java并行读取多个文件

Java并行读取多个文件

作者: 0x70e8 | 来源:发表于2018-08-16 11:33 被阅读0次

    测试打开目录下1000+文件,使用两种方式,返回处理的行数:

    • 使用JDK7 的forkjoin框架
    • 使用Java8 的并行流(内部基于forkjoin)

    code

    • RecursiveTask实现类,Forkjoin框架的Task类型,是递归任务
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.util.Arrays;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ReadFileTask extends RecursiveTask<Integer> {
    
        private static final long serialVersionUID = 1L;
        private File[] files;
        private final static Integer THRESHOLD = 20;
    
        public ReadFileTask(File[] files) {
            this.files = files;
        }
    
        @Override
        protected Integer compute() {
            int all = 0;
            int size = files.length;
            // 采用二叉树思想来拆分子任务
            if (size > THRESHOLD) {
                int mid = size / 2;
                ForkJoinTask<Integer> left = new ReadFileTask(Arrays.copyOfRange(files, 0, mid)).fork();
                ForkJoinTask<Integer> right = new ReadFileTask(Arrays.copyOfRange(files, mid, size)).fork();
                // 等待子任务的结束以统计数据
                left.join();
                right.join();
                try {
                    all = left.get() + right.get();
                } catch (InterruptedException | ExecutionException e1) {
                    e1.printStackTrace();
                }
            } else {
                // 任务详细逻辑
                for (File f : files) {
                    try (BufferedReader reader = Files.newBufferedReader(f.toPath())) {
                        String line = null;
                        while ((line = reader.readLine()) != null) {
                            // processLine(line);
                            all++;
                        }
                    } catch (IOException e) {
                    }
                }
            }
            return all;
        }
    
    // getter setter
    
    }
    
    • Client 包含并行流方法
    public class Client {
        public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
            Path path = Paths.get("C:\\Users\\Desktop\\all\\");
            useForkJoinPool(path);
            useParallelStream(path);
        }
    
        static void useForkJoinPool(Path path) throws InterruptedException, ExecutionException {
            long begin = System.currentTimeMillis();
    
            if (Files.exists(path) && Files.isDirectory(path)) {
                File[] list = path.toFile().listFiles();
                System.out.println(list.length);
                ReadFileTask task = new ReadFileTask(list);
                // 未指定pool内线程数,使用CPU core的数目
                ForkJoinPool fjp = new ForkJoinPool();
                ForkJoinTask<Integer> future = fjp.submit(task);
                System.out.println(future.get());
            }
    
            System.out.println("used:" + (System.currentTimeMillis() - begin) + "mills");
        }
        // java8 并行流 
        static void useParallelStream(Path path) throws IOException {
            long begin = System.currentTimeMillis();
            if (Files.exists(path) && Files.isDirectory(path)) {
                int[] count = { 0, 0 };
                @SuppressWarnings("resource")
                Stream<Path> files = Files.list(path);
                files.parallel().forEach(p -> {
                    count[0]++;
                    try (BufferedReader reader = Files.newBufferedReader(p)) {
                        String line = null;
                        while ((line = reader.readLine()) != null) {
                            count[1]++;
                            // processLine(line);
                        }
                    } catch (IOException e) {
    
                    }
                });
                System.out.println("files count:" + count[0]);
                System.out.println("line count:" + count[1]);
            }
            System.out.println("used:" + (System.currentTimeMillis() - begin) + "mills");
        }
    }
    
    • 结果
    // 结果
    // 1053
    // 6320
    // used:203mills
    // files count:1053
    // line count:6320
    // used:234mills
    

    相关文章

      网友评论

          本文标题:Java并行读取多个文件

          本文链接:https://www.haomeiwen.com/subject/ucmnbftx.html