美文网首页JUC并发包
Executor家族-ForkJoinPool分支合并 工作窃取

Executor家族-ForkJoinPool分支合并 工作窃取

作者: 于情于你 | 来源:发表于2020-12-29 20:53 被阅读0次

        ForkJoin框架:就是在必要的情况下,将一个大任务,进行拆分成若干个小任务(拆到不能再拆时),再将一个个的小任务运算的结果进行join汇总。

    引用InfoQ的图: image.png

        用ForkJoin之前先了解一下工作窃取算法:
        工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:


    image.png

    工作窃取算法有什么用?

        假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行
        说到底还是为了提高CPU资源利用率

        工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

        在用ForkJoin框架之前,要知道有三个任务类,分别是:ForkJoinTask(父类)、RecursiveAction(无返回值的任务)、RecursiveTask(有返回值的任务),在用的时候,我们只需要继承RecursiveAction和RecursiveTask就ok。


    image.png
    image.png

    代码示例:

    package com.yu.test.forkjoin;
    
    import java.util.concurrent.RecursiveTask;
    
    /**
     * @author yudiancheng
     */
    public class CalculationTask extends RecursiveTask<Long> {
    
        private Long start;
        private Long end;
    
        private static final long THRESHOLD = 10000L;
    
        public CalculationTask(Long start, Long end) {
            this.start = start;
            this.end = end;
        }
    
        public CalculationTask() {
        }
    
        @Override
        protected Long compute() {
            boolean flag = end - start <= THRESHOLD;
    
            long sum=0L;
            if(flag) {
    
                for(long i=start; i<=end; i++) {
                    sum += i;
                }
            } else {
    
                long middle = (start + end) / 2;
                CalculationTask left = new CalculationTask(start, middle);
    
                left.fork(); // 拆分,并且压入线程队列
                CalculationTask right = new CalculationTask(middle + 1, end);
                right.fork();
    
                sum = left.join() + right.join(); // 拿到结果合并
    
            }
            return sum;
        }
    }
    
    

    主函数:

    public static void main(String[] args) {
    
            ForkJoinPool forkJoinPool = new ForkJoinPool();
    
            CalculationTask calculationTask = new CalculationTask(1L, 10000L);
    
            Long invoke = forkJoinPool.invoke(calculationTask);
    
            System.out.println(invoke);
    
    
    }
    

        ForkJoinTask在执行过程中可能会抛出异常在主线程中无法捕获异常,ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。

    if(calculationTask.isCompletedAbnormally()) {
                calculationTask.getException();
     }
    

    相关文章

      网友评论

        本文标题:Executor家族-ForkJoinPool分支合并 工作窃取

        本文链接:https://www.haomeiwen.com/subject/cvjqoktx.html