美文网首页
Fork/Join框架学习

Fork/Join框架学习

作者: Ggx的代码之旅 | 来源:发表于2018-12-20 18:08 被阅读165次

    什么是Fork/Join框架

    Fork/Join框架是一组允许程序员利用多核处理器支持的并行执行的API。它使用了“分而治之”策略:把非常大的问题分成更小的部分,反过来,小部分又可以进一步分成更小的部分,递归地直到一个部分可以直接解决。这被叫做“fork”。

    然后所有部件在多个处理核心上并行执行。每个部分的结果被“join”在一起以产生最终结果。因此,框架的名称是“Fork/Join”。

    下面的为代码展示了分治策略如何与Fork/Join框架一起工作:

    if (problemSize < threshold)
        solve problem directly
    else {
        break problem into subproblems
        recursively solve each problem
        combine the results
    }
    

    Fork/Join框架在JDk7中被加入,并在JDK8中进行了改进。它用了Java语言中的几个新特性,包括并行的Stream API和排序。

    Fork/Join框架简化了并行程序的原因有:

    • 它简化了线程的创建,在框架中线程是自动被创建和管理。
    • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

    由于支持真正的并行执行,Fork/Join框架可以显著减少计算时间,并提高解决图像处理、视频处理、大数据处理等非常大问题的性能。

    关于Fork/Join框架的一个有趣的地方是:它使用工作窃取算法来平衡线程之间的负载:如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

    理解Fork/Join框架API

    Fork/Join框架在java.util.concurrent包下被实现。它的核心有4个类:

    • ForkJoinTask<V>: 这是一个抽象任务类,并且运行在ForkJoinPool中。
    • ForkJoinPool:这是一个线程池管理并运行众多ForkJoinTask任务。
    • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
    • RecursiveTask<V>: ForkJoinTask的子类,有返回值。

    基本上,我们解决问题的代码是在RecursiveAction或者RecursiveTask中进行的,然后将任务提交由ForkJoinPool`执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。

    我们先来理解一下这些类中的关键方法。

    ForkJoinTask<V>

    这是一个运行在ForkJoinPool中的抽象的任务类。类型V指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:

    • final ForkJoinTask<V> fork()
    • final V join()
    • final V invoke()

    fork()方法提交并执行异步任务,该方法返回ForkJoinTask并且调用线程继续运行。

    join()方法等待任务直到返回结果。

    invoke()方法是组合了fork()join(),它开始一个任务并等待结束返回结果。

    此外,ForkJoinTask中还提供了用于一次调用多个任务的两个静态方法

    • static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :执行两个任务
    • static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合

    RecursiveAction

    这是一个递归的ForkJoinTask子类,不返回结果。Recursive意思是任务可以通过分治策略分成自己的子任务(在下面的下一节中,您将看到如何划分代码示例)。

    我们必须重写compute()方法,并将计算代码写在其中:

    protected abstract void compute();

    RecursiveTask<V>

    RecursiveAction一样,但是RecursiveTask有返回结果,结果类型由V指定。我们仍然需要重写compute()方法:

    protected abstract V compute();

    ForkJoinPool

    这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask的执行,为了执行ForkJoinTask,首先需要获取到ForkJoinPool的实例。

    有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

    • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等。
    • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

    并行性的级别决定了可以并发执行的线程的数量。换句话说,它决定了可以同时执行的任务的数量——但不能超过处理器的数量。

    但是,这并不限制池可以管理的任务的数量。ForkJoinPool可以管理比其并行级别多得多的任务。

    获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:

    public static ForkJoinPool commonPool();

    这种方式创建的池不受shutdown()或者shutdownNow()方法的影响,但是他会在System.exit()时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()方法。该方式是静态的,可以自动被使用。

    ForkJoinPool中执行ForkJoinTasks

    在创建好ForkJoinPool实例之后,可以使用下面的方法执行任务:

    • <T>T invoke(ForkJoinTask<T> task):执行指定任务并返回结果,该方法是异步的,调用的线程会一直等待直到该方法返回结果,对于RecursiveAction任务来说,参数类型是Void.
    • void execute(ForkJoinTask<?> task):异步执行指定的任务,调用的线程一直等待知道任务完成才会继续执行。

    另外,也可以通过ForkJoinTask自己拥有的方法fork()invoke()执行任务。在这种情况下,如果任务还没在ForkJoinPool中运行,那么commonPool()将会自动被使用。

    值得注意的一点是:ForkJoinPool使用的是守护线程,当所有的用户线程被终止是它也会被终止,这意味着可以不必显示的关闭ForkPoolJoin(虽然这样也可以)。如果是common pool的情况下,调用shutdown没有任何效果,应为这个池总是可用的。

    好了,现在来看看一些例子。

    案例

    使用RecursiveAction

    这里例子中,看一下如果使用Fork/Join框架去执行一个没有返回值的任务。

    假设要对一个很大的数字数组进行变换,为了简单简单起见,转换只需要将数组中的每个元素乘以指定的数字。下面的代码用于转换任务:

    import java.util.concurrent.*;
     
    public class ArrayTransform extends RecursiveAction {
        int[] array;
        int number;
        int threshold = 100_000;
        int start;
        int end;
     
        public ArrayTransform(int[] array, int number, int start, int end) {
            this.array = array;
            this.number = number;
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected void compute() {
            if (end - start < threshold) {
                computeDirectly();
            } else {
                int middle = (end + start) / 2;
     
                ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
                ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
     
                invokeAll(subTask1, subTask2);
            }
        }
     
        protected void computeDirectly() {
            for (int i = start; i < end; i++) {
                array[i] = array[i] * number;
            }
        }
    }
    

    可以看到,这是一个RecursiveAction的子类,我们重写了compute()方法。

    数组和数字从它的构造函数传递。参数start和end指定要处理的数组中的元素的范围。如果数组的大小大于阈值,这有助于将数组拆分为子数组,否则直接对整个数组执行计算。

    观察else中的代码片段:

    protected void compute() {
        if (end - start < threshold) {
            computeDirectly();
        } else {
            int middle = (end + start) / 2;
     
            ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
            ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
     
            invokeAll(subTask1, subTask2);
        }
    }
    

    这里,将数组分成两个部分,并分别创建他们的子任务,反过来,子任务也可以递归的进一步划分为更小的子任务,直到其大小小于直接调用computeDirectly();方法的的阈值。

    然后,在main函数中创建ForkJoinPool执行任务:

    ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(mainTask);
    

    或者使用common pool执行任务:

    ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
    mainTask.invoke();
    

    这里是全部的测试程序:

    import java.util.*;
    import java.util.concurrent.*;
     
    public class ForkJoinRecursiveActionTest {
        static final int SIZE = 10_000_000;
        static int[] array = randomArray();
     
        public static void main(String[] args) {
     
            int number = 9;
     
            System.out.println("数组中的初始元素: ");
            print();
     
            ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
            ForkJoinPool pool = new ForkJoinPool();
            pool.invoke(mainTask);
     
            System.out.println("并行计算之后的元素:");
            print();
        }
     
        static int[] randomArray() {
            int[] array = new int[SIZE];
            Random random = new Random();
     
            for (int i = 0; i < SIZE; i++) {
                array[i] = random.nextInt(100);
            }
     
            return array;
        }
     
        static void print() {
            for (int i = 0; i < 10; i++) {
                System.out.print(array[i] + ", ");
            }
            System.out.println();
        }
    }
    

    如您所见,使用随机生成的1,000万个元素数组进行测试。由于数组太大,我们在计算前后只打印前10个元素,看效果如何:

    数组中的初始元素:
    42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
    并行计算之后的元素:
    378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
    

    使用RecursiveTask

    这个例子中,展示了如何使用带有返回值的任务,下面的任务计算在一个大数组中出现偶数的次数:

    import java.util.concurrent.*;
     
    public class ArrayCounter extends RecursiveTask<Integer> {
        int[] array;
        int threshold = 100_000;
        int start;
        int end;
     
        public ArrayCounter(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
     
        protected Integer compute() {
            if (end - start < threshold) {
                return computeDirectly();
            } else {
                int middle = (end + start) / 2;
     
                ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
                ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
     
                invokeAll(subTask1, subTask2);
     
     
                return subTask1.join() + subTask2.join();
            }
        }
     
        protected Integer computeDirectly() {
            Integer count = 0;
     
            for (int i = start; i < end; i++) {
                if (array[i] % 2 == 0) {
                    count++;
                }
            }
     
            return count;
        }
    }
    

    如你所见,这个类是RecursiveTask的子类并且重写了compute()方法,并且返回了一个整型的结果。

    这里还使用了join()方法去合并子任务的结果:

    return subTask1.join() + subTask2.join();
    

    测试程序就和RecursiveAction的一样:

    import java.util.*;
    import java.util.concurrent.*;
     
    public class ForkJoinRecursiveTaskTest {
        static final int SIZE = 10_000_000;
        static int[] array = randomArray();
     
        public static void main(String[] args) {
     
            ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
            ForkJoinPool pool = new ForkJoinPool();
            Integer evenNumberCount = pool.invoke(mainTask);
     
            System.out.println("偶数的个数: " + evenNumberCount);
        }
     
        static int[] randomArray() {
            int[] array = new int[SIZE];
            Random random = new Random();
     
            for (int i = 0; i < SIZE; i++) {
                array[i] = random.nextInt(100);
            }
     
            return array;
        }
     
    }
    

    运行程序就会看到如下的结果:

    偶数的个数: 5000045
    

    并行性试验

    这个例子展示并行性的级别如何影响计算时间:

    ArrayCounter类让阈值可以通过构造器传入:

    import java.util.concurrent.*;
     
    public class ArrayCounter extends RecursiveTask<Integer> {
        int[] array;
        int threshold;
        int start;
        int end;
     
        public ArrayCounter(int[] array, int start, int end, int threshold) {
            this.array = array;
            this.start = start;
            this.end = end;
            this.threshold = threshold;
        }
     
        protected Integer compute() {
            if (end - start < threshold) {
                return computeDirectly();
            } else {
                int middle = (end + start) / 2;
     
                ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
                ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
     
                invokeAll(subTask1, subTask2);
     
     
                return subTask1.join() + subTask2.join();
            }
        }
     
        protected Integer computeDirectly() {
            Integer count = 0;
     
            for (int i = start; i < end; i++) {
                if (array[i] % 2 == 0) {
                    count++;
                }
            }
     
            return count;
        }
    }
    

    测试程序将并行度级别和阈值作为参数传递:

    import java.util.*;
    import java.util.concurrent.*;
     
    public class ParallelismTest {
        static final int SIZE = 10_000_000;
     
        static int[] array = randomArray();
     
        public static void main(String[] args) {
            int threshold = Integer.parseInt(args[0]);
            int parallelism = Integer.parseInt(args[1]);
     
            long startTime = System.currentTimeMillis();
     
            ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
            ForkJoinPool pool = new ForkJoinPool(parallelism);
            Integer evenNumberCount = pool.invoke(mainTask);
     
            long endTime = System.currentTimeMillis();
     
            System.out.println("偶数的个数: " + evenNumberCount);
     
            long time = (endTime - startTime);
            System.out.println("执行时间: " + time + " ms");
        }
     
        static int[] randomArray() {
            int[] array = new int[SIZE];
            Random random = new Random();
     
            for (int i = 0; i < SIZE; i++) {
                array[i] = random.nextInt(100);
            }
     
            return array;
        }
     
    }
    

    该程序允许您使用不同的并行度和阈值轻松测试性能。注意,它在最后打印执行时间。尝试用不同的参数多次运行这个程序,并观察执行时间。

    结论

    • Fork/Join框架的设计简化了java语言的并行程序
    • ForkJoinPoolFork/Join框架的核心,它允许多个ForkJoinTask请求由少量实际线程执行,每个线程运行在单独的处理核心上
    • 既可以通过构造器也可以通过静态方法common pool去获取ForkJoinPool的实例
    • ForkJoinTask是一个抽象类,它表示的任务比普通线程更轻。通过覆盖其compute()方法实现计算逻辑
    • RecursiveAction是一个没有返回值的ForkJoinTask
    • RecursiveTask是一个有返回值的ForkJoinTask
    • ForkJoinPool与其它池的不同之处在于,它使用了工作窃取算法,该算法允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务
    • ForkJoinPool中的线程是守护线程,不必显式地关闭池
    • 执行一个ForkJoinTask既可以通过调用它自己的invoke()fork()方法,也可以提交任务给ForkJoinPool并调用它的invoke()或者execute()方法
    • 直接使用ForkJoinTask自身的方法执行任务,如果它还没运行在ForkJoinPool中那么将运行在common pool
    • ForkJoinTask中使用join()方法,可以合并子任务的结果
    • invoke()方法会等待子任务完成,但是execute()方法不会

    相关文章

      网友评论

          本文标题:Fork/Join框架学习

          本文链接:https://www.haomeiwen.com/subject/iakukqtx.html