美文网首页
java 多线程 -- Fork/Join

java 多线程 -- Fork/Join

作者: aix91 | 来源:发表于2019-01-14 18:52 被阅读0次

    1. 简介

    Fork/Join 框架,将大任务分解成可以直接执行的小任务,然后将小任务的结果汇总得到大任务的结果。通过这种方式,Fork/Join 能试着去使用所有可用的处理器,以达到加速处理多线程的目的。

    2. ForkJoinPool

    ForkJoinPool是该框架的核心,它实现了AbstractExecutorService,并且管理着工作线程(ForkJoinWorkerThread). ForkJoinPool并不会为每一个ForkJoinTask分配一个线程,而是为每一个工作线程分配一个deque, 每一个deque里面存放着多个ForkJoinTask。正是这样的结构,Fork/Join才能实现"work-stealing"算法,从而增加多线程的使用效率。

    • work-stealing算法
      work-stealing基本思路
      通常情况下,一个工作线程从自己的 deque 中pop出一个ForkJoinTask (LIFO) 并执行。当自己的deque里没有task时,就会随机地从其他工作线程的 deque 中take 一个ForkJoinTask (FIFO) 并执行。当工作线程遇到join时,它会执行其他的task,这样所有的task都不会阻塞其他task的执行。
    • ForkJoinPool创建
    1. 通过ForkJoinPool.common()方法创建一个ForkJoinPool实例:该方法默认会创建 "Runtime.getRuntime().availableProcessors() - 1"个工作线程。
    2. ForkJoinPool forkJoinPool = new ForkJoinPool(2);

    3. ForkJoinTask<V>

    ForkJoinTask是能在ForkJoinPool中执行的基础task,他有两个实现类:RecursiveAction(无返回值), RecursiveTask<V>(有返回值)

    4. submit task to ForkJoinPool

    • ForkJoin 的 invoke()方法 可以fork并执行task
    • 先execute(task), 再join()获取结果

    5. ForkJoinPool VS ExecutorService

    ForkJoinPool只是ExecutorService的一个补充。当一个任务可以递归分解成多个小任务时,使用ForkJoin效率会更高;当多个任务之间没有关联时,ExecutorService是更好的选择。

    6. 实例

    • 创建Task
    public class TopicCountTask extends RecursiveTask<List<TopicCountModel>> {
        private List<Function<List<Long>, TopicCountModel>> countFunctionList;
        private List<Long> trackIds;
        public TopicCountTask(List<Function<List<Long>, TopicCountModel>> countFunctionList, List<Long> trackIds) {
            this.countFunctionList = countFunctionList;
            this.trackIds = trackIds;
        }
        @Override
        protected List<TopicCountModel> compute() {
            if (countFunctionList.size() == 1) {
               //执行任务
                TopicCountModel model = countFunctionList.get(0).apply(trackIds);
                System.out.println("count service thread" + Thread.currentThread().getName());
                return Arrays.asList(model);
            } else {
                List<TopicCountModel> result = new ArrayList<>();
                int length = countFunctionList.size();
                int mid = length / 2;
                List<Function<List<Long>, TopicCountModel>> leftList = countFunctionList.subList(0, mid);
                List<Function<List<Long>, TopicCountModel>> rightList = countFunctionList.subList(mid, length);
                // 分解任务
                TopicCountTask taskLeft = new TopicCountTask(leftList, trackIds);
                TopicCountTask taskRight = new TopicCountTask(rightList, trackIds);
                taskLeft.fork();
                taskRight.fork();
                result.addAll(taskLeft.join());
                result.addAll(taskRight.join());
                return result;
            }
        }
    }
    
    
    • 在ForkJoin中执行Task
            //创建ForkJoinPool
            forkJoinPool = new ForkJoinPool(5);
            //执行ForkJoinTask
            List<TopicCountModel> list = forkJoinPool.invoke(new TopicCountTask(functionList, trackIds));
    
    

    相关文章

      网友评论

          本文标题:java 多线程 -- Fork/Join

          本文链接:https://www.haomeiwen.com/subject/fwnorqtx.html