美文网首页
ForkJoinPool

ForkJoinPool

作者: 手打丸子 | 来源:发表于2019-01-05 21:59 被阅读0次
    1. 为什么使用fork join框架?

    ForkJoinPool是ExecutorSerice的一个补充,而不是替代品

    并不适合所有场景;
    特别适合用于“分而治之”,递归计算的算法;

    JAVA8中CompeleteFuture、并发流等都是基于ForkJoinPool实现;

    本文将讲述什么样的场景合适、什么样的场景不合适,并且深入剖析为啥某些场景不合适。

    1. fork join框架使用简单实例
      使用RecursiveTask实现一个累加的功能,使用分而治之的思想,实现分段求和后汇总
    public class SumTask extends RecursiveTask<Integer> {
    
        private Integer start = 0;
        private Integer end = 0;
    
        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
    
            if (end - start < 100) {
                //小于100时直接返回结果
                int sumResult = 0;
                for (int i = start; i <= end; i++) {
                    sumResult += i;
                }
                return sumResult;
            } else {
                //大于一百时进行分割
                int middle = (end + start) / 2;
                SumTask leftSum = new SumTask(this.start, middle);
                SumTask rightSum = new SumTask(middle, this.end);
                leftSum.fork();
                rightSum.fork();
                return leftSum.join() + rightSum.join();
            }
        }
    
        public static void main(String[] args) {
            SumTask sumTask = new SumTask(1, 999999);
            sumTask.fork();
            System.out.print("result:" + sumTask.join());
        }
    } 
    

    默认线程池有固定的线程数,会根据可用的availableProcessors来计算线程数量;

    /**
         * Creates a {@code ForkJoinPool} with parallelism equal to {@link
         * java.lang.Runtime#availableProcessors}, using the {@linkplain
         * #defaultForkJoinWorkerThreadFactory default thread factory},
         * no UncaughtExceptionHandler, and non-async LIFO processing mode.
         *
         * @throws SecurityException if a security manager exists and
         *         the caller is not permitted to modify threads
         *         because it does not hold {@link
         *         java.lang.RuntimePermission}{@code ("modifyThread")}
         */
        public ForkJoinPool() {
            this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                 defaultForkJoinWorkerThreadFactory, null, false);
        }
    

    如果想使用自定义线程池,比如100个线程的线程池,可以如下:
    当然无论你初始化多少线程,都只会有和CPU数量相等的几个线程运行,大部分场景下并无异于这个框架的加速;

    public static void main(String[] args) {
            ForkJoinPool forkJoinPool=new ForkJoinPool(100);
            SumTask sumTask = new SumTask(1, 999999);
            forkJoinPool.submit(sumTask);
            System.out.print("result:" + sumTask.join());
        }
    

    我们来看看fork的时候做了些什么

     /**
         * Arranges to asynchronously execute this task in the pool the
         * current task is running in, if applicable, or using the {@link
         * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
         * it is not necessarily enforced, it is a usage error to fork a
         * task more than once unless it has completed and been
         * reinitialized.  Subsequent modifications to the state of this
         * task or any data it operates on are not necessarily
         * consistently observable by any thread other than the one
         * executing it unless preceded by a call to {@link #join} or
         * related methods, or a call to {@link #isDone} returning {@code
         * true}.
         *
         * @return {@code this}, to simplify usage
         */
        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    

    当前线程不是个ForkJoinWorkerThread的时候,则加入到ForkJoinPool线程池(基于ExecutorService实现);
    如果当前线程已经是个ForkJoinWorkerThread了,则把这个任务加入到当前线程的workQueue;

    大概的图
    这是和普通线程池不同的地方,task并不是交给线程池中的queue,而是放到线程本地的workQueue

    我们来看看ForkJoinPool中的task是如何运行的
    a. 线程以LIFO先进后出方式从本地队列获取任务,执行,直到自己的队列为空;
    b. 查看其他ForkJoinWorkerThread是否有未执行task,有的话通过work−stealing窃取,窃取方式为FIFO先进先出,减少竞争;优先看曾今从自己那里窃取任务的thread,如果有的话;
    c. 任务运行完成时,返回结果;

    1. java8并发流
      java8的并发流使用的也是ForkJoinPool
    myList.parallelStream.map(obj -> longRunningOperation())
    

    默认使用公用线程池,使用独立线程池的话如下

    ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
    forkJoinPool.submit(() -> {  
        firstRange.parallelStream().forEach((number) -> {
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) { }
        });
    });
     
    ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
    forkJoinPool2.submit(() -> {  
        secondRange.parallelStream().forEach((number) -> {
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
            }
        });
    });
    

    4.为什么不适合执行有block比如有io的任务
    线程block的时候,线程池会调度线程池队列中的其他未线程运行,这是ExecutorService的机制;
    但是ForkJoinPool中的ForkJoinWorkerThread工作机制是不停执行本地workQueue中的task,task是一个个取的,顺序执行,没有塞回去的动作,并不会因为某个task引起block后而换个task继续执行;

    参考:
    Java Tip: When to use ForkJoinPool vs ExecutorService
    http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
    https://www.jianshu.com/p/bd825cb89e00
    https://www.cnblogs.com/richaaaard/p/6601445.html

    相关文章

      网友评论

          本文标题:ForkJoinPool

          本文链接:https://www.haomeiwen.com/subject/vmadrqtx.html