美文网首页
Java并发编程-分而治之:Fork&Join线程池

Java并发编程-分而治之:Fork&Join线程池

作者: agile4j | 来源:发表于2018-09-12 08:23 被阅读21次

参考资料:《实战Java高并发程序设计》


1. 简介

  • 分而治之是一个非常有效的处理大量数据的方法。

  • Fork一词原义是吃饭用的叉子,也有分叉的意思。Linux中使用fork()函数来创建子进程,从而使系统进程可以多一个执行分支。Java中也沿用了类似的命名。

  • Join的含义和Thread.join()类似,表示等待。也就是使用fork()后系统多了一个执行分支(线程),所以需要等待这个线程执行完毕,才有可能得到最终结果。

  • 在实际使用中,如果毫无顾忌地使用fork()开启线程,那么很有可能导致系统因开启过多线程而严重影响性能。所以JDK给出了一个ForkJoinPool线程池,对于fork方法并不急着开启线程,而是提交给ForkJoinPool线程池处理,以节省系统资源

  • 使用Fork&Join进行进行数据处理的总体结构如下:


    forkJoin.png-208.1kBforkJoin.png-208.1kB
  • 由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。所以,可能遇到这样一种情况:线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理。此时,线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理,尽可能地达到平衡。示意图:

    queue.png-63.8kBqueue.png-63.8kB
  • 值得注意的是:当一个线程试图帮助另一个线程时,总是从任务队列的尾部拿数据,而线程试图执行自己的任务时,则是从头部开始拿。这是为了避免数据竞争

2.ForkJoinPool 和 ForkJoinTask

  • 先来看一下ForkJoinPool的一个重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
  • 我们可以向ForkJoinPool线程池提交一个ForkJoinTask任务。
  • 所谓ForkJoinTask任务就是支持fork()分解join()等待的任务。
  • ForkJoinTask有两个重要的子类:
  1. RecursiveAction没有返回值的任务
  2. RecursiveTask携带返回值的任务
  • 下面通过一个计算数列求和的demo,来展示Fork&Join的使用:
public class Test {

    public static class CountTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10000;
        private long start;
        private long end;

        public CountTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            if (canCompute) {
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
            } else {
                // 分成100个小任务
                long step = (start + end) / 100;
                ArrayList<CountTask> subTasks = new ArrayList<>();
                long pos = start;
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;
                    if (lastOne > end) {
                        lastOne = end;
                    }
                    CountTask subTask = new CountTask(pos, lastOne);
                    pos += step + 1;
                    subTasks.add(subTask);
                    subTask.fork();
                }
                for (CountTask t : subTasks) {
                    sum += t.join();
                }
            }
            return sum;
        }
    }

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = forkJoinPool.submit(new CountTask(0, 200000L));
        try {
            long result = task.get();
            System.out.println("sum=" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 输出:
// sum=20000100000
  • 上述代码在执行get()方法时,如果任务没有结束,那么主线程就会在get()方法上等待。
  • 使用ForkJoin时要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:
  1. 系统内的线程数量越积越多,导致性能严重下降。
  2. 函数的调用层次变得很深,最终导致栈溢出。
  • 此外,ForkJoin线程池使用一个无锁的栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从中唤醒这些线程。

end

相关文章

网友评论

      本文标题:Java并发编程-分而治之:Fork&Join线程池

      本文链接:https://www.haomeiwen.com/subject/eocmgftx.html