美文网首页
Fork/Join框架

Fork/Join框架

作者: 全栈未遂工程师 | 来源:发表于2016-08-09 16:46 被阅读159次

    什么是Fork/Join框架

    Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

    • Fork就是把一个大任务切分为若干子任务并行的执行。
    • Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。

    工作窃取算法

    就是把一个大任务分成若干小任务,放到若干队列,一个队列对应一个线程。有的线程先把一个队列中的任务执行完了,那么这个线程从另一个队列中取任务来执行。

    两个线程从一个队列中取任务来执行,那么存在竞争关系,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

    Fork/Join框架的介绍

    • 分割任务:一个fork类来把大任务分割成子任务,可能子任务还比较大,所以需要不同分割,直到分割的任务足够小。
    • 合并结果:子任务执行的结果都放在一个队列中,启动一个线程从队列里拿数据然后合并数据。

    怎么实现:

    • 首先创建ForkJoin任务。不需要直接继承ForkJoinTask类,只需要继承它的子类。
      • RecursiveAction:用于没有返回结果的任务
      • RecursiveTask:用于有返回结果的任务
    • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,会随机从其他工作线程的队列尾部获取一个任务。

    使用Fork/Join框架

    /**
     * ClassName: CountTask 
     * @Description: 计算1+2+3+4+5的结果。
     * @author Panyk
     * @date 2016年8月9日
     */
    package com.forkjoin;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
    import java.util.concurrent.RecursiveTask;
    
    public class CountTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 2;//阈值
        private int start;
        private int end;
        
        public CountTask(int start, int end) {
            super();
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            int sum = 0;
            //如果任务足够小,那么执行计算任务
            boolean canCompute = (end-start)<THRESHOLD;
            if(canCompute){
                System.out.println(start + "--" + end);
                for(int i=start; i<=end; i++){
                    sum+=i;
                }
            }else{
                //如果任务大于阈值,分裂成两个子任务计算
                int middle = (start + end)/2;
                CountTask leftTask = new CountTask(start,middle);
                CountTask rightTask = new CountTask(middle+1, end);
                //执行子任务
                leftTask.fork();
                rightTask.fork();
                //等待子任务执行完,得到结果
                int leftRes = leftTask.join();
                int rightRes = rightTask.join();
                //合并子任务
                sum = leftRes + rightRes;
            }
            return sum;
        }
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            //生成一个计算任务,计算1加到4
            CountTask task = new CountTask(1,5);
            //执行一个任务
            Future<Integer> result = forkJoinPool.submit(task);
            if(task.isCompletedAbnormally()) {//是否有异常
                System.out.println(task.getException());
            }
            try{
                System.out.println(result.get());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    

    Fork/Join框架的异常处理

    ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。

    getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

    相关文章

      网友评论

          本文标题:Fork/Join框架

          本文链接:https://www.haomeiwen.com/subject/yordsttx.html