美文网首页多线程
初识ForkJoinPool

初识ForkJoinPool

作者: tracy_668 | 来源:发表于2020-06-25 08:37 被阅读0次

    流式编程基础

    如下代码我们首先创建了一个 list,然后从 list 上获取流对象,并使用 foreach 进行遍历:

    
     @Test
        public void StreamTest() throws IOException {
            // 1.创建list
            ArrayList<String> arrayList = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                arrayList.add(i + "");
            }
            arrayList.stream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
        }
    
    

    运行上面代码,输出为:

    main 0
    main 1
    main 2
    main 3
    main 4
    ...
    
    

    上面打印元素使用的 main 线程顺序进行的,大家都知道我们可以把流转换为并行流,代码如下:

     @Test
        public void parallelStreamTest() throws IOException {
            // 1.创建list
            ArrayList<String> arrayList = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                arrayList.add(i + "");
            }
            arrayList.parallelStream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
        }
    
    

    运行上面代码输出如下:

    ForkJoinPool.commonPool-worker-6 94
    main 73
    main 74
    ForkJoinPool.commonPool-worker-6 95
    ForkJoinPool.commonPool-worker-1 39
    ForkJoinPool.commonPool-worker-3 69
    ForkJoinPool.commonPool-worker-3 70  
    ...
    
    

    上面代码则是使用 ForkJoinPool 的 common 线程池与 main 线程并行输出的,另外我们知道貌似无法对流式的并行处理的线程池进行定制,其内部使用的是整个 JVM 内唯一的 common 线程池。 真的是这样吗?

    猜执行结果

    上面我们介绍了流式编程的并行流,下面请看下面代码输出时候,打印的线程名称是什么:

     @Test
        public void forkPoolTest() {
            //代码示例1
               ForkJoinPool pool = new ForkJoinPool(3);
    
                // 1.创建list
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 100; ++i) {
                    arrayList.add(i + "");
                }
    
                pool.submit(() -> arrayList.parallelStream().forEach(e -> {
                    System.out.println(Thread.currentThread().getName() + " " + e);
                })).join();
                System.out.println("Main is over");
    
    
    
        }
    

    阅读上面代码,我们可以看到 main 线程向 forkjoin 线程池里面添加了一个任务,然后阻塞等待任务的完成,然后打印输出 Main is over。

    那么这与不提交任务到线程池,而是直接执行,如下代码,看起来没啥区别:

     @Test
        public void parallelStreamTest() throws IOException {
            // 1.创建list
            ArrayList<String> arrayList = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                arrayList.add(i + "");
            }
            arrayList.parallelStream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
        }
    

    如上代码,我们也是在 main 线程等待打印任务执行完毕后,在输出 Main is over。

    其实不然,在原来的代码示例 1 中,我们创建了一个名称为 pool 的线程池,然后向其中提交了一个任务。 pool 中提交任务后,会创建一个 ForkJoinWorkerThread 类型的线程,来执行我们提交的任务,也就是执行:

    arrayList.stream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
    

    运行上面代码,按理说是 ForkJoinPool 中的 common 线程池线程并行,执行打印输出。但是运行后,你会发现打印任务的线程却是我们自己创建的 pool 中的线程,也就是我们使用自己创建的 pool 替代了并行流默认的 ForkJoinPool 中的 common 线程池。

    究其原因是当我们调用并行流的 forEach 方法时候,会调用到 ForkJoinTask 的 fork 方法进行子任务切分:

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    
    

    由于调用 forEach 的是我们自己创建的 pool 里面的线程(其是 ForkJoinWorkerThread 类型的),所以会把切分的任务添加到我们调用线程所在的队列里面,而不是添加到了 common 线程池里面。

    相关文章

      网友评论

        本文标题:初识ForkJoinPool

        本文链接:https://www.haomeiwen.com/subject/xbojfktx.html