美文网首页爱编程,爱生活
java concurrent 之 ForkJoinPool

java concurrent 之 ForkJoinPool

作者: 熬夜的猫头鹰 | 来源:发表于2018-06-16 21:53 被阅读24次

    java concurrent 之 ForkJoinPool

    ForkJoinPool在Java 7中被引入。ForkJoinPool类似于Java ExecutorService,但有一个区别。 ForkJoinPool可以轻松将任务分解成较小的任务,然后将其提交给ForkJoinPool。 任务可以将其工作分成较小的子任务,只要它能够分解任务即可。 它可能听起来有点抽象,所以在这个fork和join教程中,我将解释ForkJoinPool如何工作,以及分裂任务如何工作。

    解释Fork和Join

    在我们看看ForkJoinPool之前,我想解释一下fork和join的原理。

    Fork和Join原则由递归执行的两个步骤组成。 这两个步骤是fork步骤和join步骤。

    Fork

    使用fork和join原理的任务可以将(自己)分割成更小的子任务

    image

    通过将其自身分解为子任务,每个子任务可以由不同的CPU或同一CPU上的不同线程并行执行。

    如果任务给出的工作足够大,任务只会分解成子任务,这样才有意义。 将任务分解为子任务有一个开销,因此对于少量工作,此开销可能大于通过并发执行子任务而实现的加速。

    将任务分解为子任务的时间限制也称为阈值。 每个任务都由决定一个明智的门槛决定。 这在很大程度上取决于正在做的工作。

    其实阈值的问题就是在递归算法中的退出条件相似

    Join

    当一个任务已经分裂成子任务时,任务等待直到子任务完成执行。

    子任务完成执行后,任务可以将所有结果加入(合并)为一个结果。 如下图所示:

    image

    ForkJoinPool

    ForkJoinPool是一个特殊的线程池,旨在使用fork-and-join任务拆分工作。 ForkJoinPool位于java.util.concurrent包中,因此完整的类名称为java.util.concurrent.ForkJoinPool。

    创建ForkJoinPool

    
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    
    

    ForkJoinTask任务分为两类

    • RecursiveAction 用于没有返回结果的任务。

    • RecursiveTask 用于有返回结果的任务

    ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

    使用Fork/Join框架 Demo

    package com.viashare.forkjoin;
    
    import java.util.concurrent.*;
    
    /**
     * Created by Jeffy on 16/01/12.
     */
    public class ForkJoinMain {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Integer> future = forkJoinPool.submit(new CountTask(1, 5));
            System.err.println(future.get());
    
        }
    
        static class CountTask extends RecursiveTask<Integer> {
    
            private static final int Threshold = 3;
    
            private int start;
    
            private int end;
    
            public CountTask(int start, int end) {
                this.start = start;
                this.end = end;
            }
    
            @Override
            protected Integer compute() {
                int sum = 0;
                int temp = end - start;
                System.err.println(temp);
                if (temp <= Threshold) {
                    for (int i = start; i <=end; i++) {
                        sum += i;
                    }
                } else {
                    int millde = (end+start)/Threshold;
                    CountTask task1 = new CountTask(start, millde);
                    CountTask task2 = new CountTask(millde+1, end);
                    task1.fork();
                    task2.fork();
    
                    int sum1 = task1.join();
                    int sum2 = task2.join();
                    System.err.println("sum1  "+sum1);
                    System.err.println("sum2  "+sum2);
                    sum = sum1 + sum2;
                }
    
                return sum;
            }
        }
    }
    
    
    

    Demo2

    
    package com.viashare.forkjoin;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * Created by Jeffy on 16/01/12.
     */
    public class ForkJoinmain2 {
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool(4);
            MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
    
            long mergedResult = forkJoinPool.invoke(myRecursiveTask);
    
            System.out.println("mergedResult = " + mergedResult);
        }
    
        static class MyRecursiveTask extends RecursiveTask<Long> {
    
            private long workLoad = 0;
    
            public MyRecursiveTask(long workLoad) {
                this.workLoad = workLoad;
            }
    
            protected Long compute() {
                //if work is above threshold, break tasks up into smaller tasks
                if (this.workLoad > 16) {
                    System.out.println("Splitting workLoad : " + this.workLoad);
                    List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
                    subtasks.addAll(createSubtasks());
    
                    for (MyRecursiveTask subtask : subtasks) {
                        subtask.fork();
                    }
    
                    long result = 0;
                    for (MyRecursiveTask subtask : subtasks) {
                        result += subtask.join();
                    }
                    return result;
    
                } else {
                    System.out.println("Doing workLoad myself: " + this.workLoad);
                    return workLoad * 3;
                }
            }
    
            private List<MyRecursiveTask> createSubtasks() {
                List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
    
                MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
                MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
    
                subtasks.add(subtask1);
                subtasks.add(subtask2);
    
                return subtasks;
            }
        }
    }
    
    

    Fork/Join框架的异常处理

    ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:

    if(task.isCompletedAbnormally())
    {
        System.out.println(task.getException());
    }
    
    

    getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

    Fork/Join框架的实现原理

    相关文章

      网友评论

        本文标题:java concurrent 之 ForkJoinPool

        本文链接:https://www.haomeiwen.com/subject/bhxieftx.html