美文网首页
Java 并发之 ForkJoinPool

Java 并发之 ForkJoinPool

作者: 放开那个BUG | 来源:发表于2020-12-23 23:59 被阅读0次

一、前言

好久没有正正经经写一篇文章了,都是随记写得多。但是今天看了 Java 的 ForkJoinPool,我真的是忍不住要来一篇了。先不说它的源码,光它的使用而言,它把分治法体现的淋漓尽致。比如我们一般能用分治法解决的题目,每个小问题以前从头到尾都是一个线程计算,后面再合并成一个结果。但我们其实可以想一下,如果每个小问题分到最后都能以一个单独的线程,岂不是很快!!!ForkJoinPool 为此而生。

二、实例

比如我们以前写归并排序,都是这样的。

import java.util.Arrays;

public class MergeSort {

    public static void mergeSort(int[] arr) {
        if (arr == null || arr.length < 2) {
            return;
        }
        mergeSort(arr, 0, arr.length - 1);
    }

    public static void mergeSort(int[] arr, int l, int r) {
        if (l == r) {
            return;
        }
        int mid = l + ((r - l) >> 1);
        mergeSort(arr, l, mid);
        mergeSort(arr, mid + 1, r);
        merge(arr, l, mid, r);
    }

    public static void merge(int[] arr, int l, int m, int r) {
        int[] help = new int[r - l + 1];
        int i = 0;
        int p1 = l;
        int p2 = m + 1;
        while (p1 <= m && p2 <= r) {
            help[i++] = arr[p1] < arr[p2] ? arr[p1++] : arr[p2++];
        }
        while (p1 <= m) {
            help[i++] = arr[p1++];
        }
        while (p2 <= r) {
            help[i++] = arr[p2++];
        }
        for (i = 0; i < help.length; i++) {
            arr[l + i] = help[i];
        }
    }

    
    // for test
    public static void main(String[] args) {
                int[] array = {2, 3, 1, 6, 5};
        MergeSort.mergeSort(array);
    }

}

使用ForkJoinPool 之后,代码变成了这样。其实好像没啥变化,就依旧还是递归的思想,只是创建子问题那当作任务放入线程池中(当然,这种子任务的创建方便多了)。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * @author xushu
 * @create 12/23/20 11:44 PM
 * @desc
 */
public class ForkJoinMergeSort {


    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(6);
        int[] arr = {3, 2, 6, 1, 7, 10, 2, 3, 5, 6};
        Task task = new Task(arr, 0, arr.length - 1);
        forkJoinPool.invoke(task);

        for (int i : arr) {
            System.out.println(i);
        }
    }


    
    static class Task extends RecursiveTask<Void> {

        private int[] arr;
        private int start;
        private int end;

        public Task(int[] arr, int start, int end) {
            this.arr = arr;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Void compute() {
            if (start == end) {
                // return null 没办法,必须要求返回值。。。。
                return null;
            }
            int mid = start + ((end - start) >> 1);
            Task left = new Task(arr, start, mid);
            Task right = new Task(arr, mid + 1, end);

            left.fork();
            right.fork();

            left.compute();
            right.compute();

            merge(arr, start, mid, end);

            // return null 没办法,必须要求返回值。。。。
            return null;
        }
    }

    public static void merge(int[] arr, int l, int m, int r) {
        int[] help = new int[r - l + 1];
        int i = 0;
        int p1 = l;
        int p2 = m + 1;
        while (p1 <= m && p2 <= r) {
            help[i++] = arr[p1] < arr[p2] ? arr[p1++] : arr[p2++];
        }
        while (p1 <= m) {
            help[i++] = arr[p1++];
        }
        while (p2 <= r) {
            help[i++] = arr[p2++];
        }
        for (i = 0; i < help.length; i++) {
            arr[l + i] = help[i];
        }
    }
}

当然,如果理解不了,其实这篇文章中的1到100的数字和最为直观简单,可以参照(https://my.oschina.net/xinxingegeya/blog/3007257)。

三、简略的原理分析

这篇文章顺序相反,先说使用后简略原理,注意是没有深入看源码。。。。。

参数解释

它的初始化代码如下:

    /**
     * Creates a {@code ForkJoinPool} with the given parameters, without
     * any security checks or parameter validation.  Invoked directly by
     * makeCommonPool.
     */
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

重要的参数解释:

  • 1.parallelism:并行度( the parallelism level),默认情况下跟我们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我们机器运行时可用的CPU个数
    1. factory:创建新线程的工厂( the factory for creating new threads)。默认情况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
    1. handler:线程异常情况下的处理器,该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为null。
    1. asyncMode:这个参数要注意,在ForkJoinPool中,每一个工作线程都有一个独立的任务队列,asyncMode表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。
      ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。

ForkJoinPool work stealing 算法

work stealing
  • 1.ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  • 2.每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  • 3.每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  • 4.在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  • 5.在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

ForkJoinTask fork 方法

fork() 做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。

    /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

四、参考资料

https://my.oschina.net/xinxingegeya/blog/3007257

相关文章

网友评论

      本文标题:Java 并发之 ForkJoinPool

      本文链接:https://www.haomeiwen.com/subject/kilpnktx.html