美文网首页
parallelStream和ForkJoinPool的使用风险

parallelStream和ForkJoinPool的使用风险

作者: 怕水河马 | 来源:发表于2021-10-09 17:41 被阅读0次

    1.parallelStream和ForkJoinPool

    parallelStream是JDK官方在1.8版本中增加的语法级新特性,主要的特点就是可以帮助用户在流式开发时快速实现并行编程,从而快速简单的实现多线程运行,同时降低用户对于线程池维护带来的复杂性。
    
    使用parallelStream的典型代码:
    list.parallelStream().forEach(o -> {
        o.doSomething();
        ...
    });
    
    ForkJoinPool是JDK官方在1.7版本中引入的特定线程池,主要应用于基于递归调用策略的任务流多线程调用场景。
    

    2.风险点

    虽然parallelStream的流式编程带来的极大的多线程开发便利性,但同时也带来了一个隐含的逻辑,且并未在接口注释中说明:
    
        /**
         * Returns a possibly parallel {@code Stream} with this collection as its
         * source.  It is allowable for this method to return a sequential stream.
         *
         * <p>This method should be overridden when the {@link #spliterator()}
         * method cannot return a spliterator that is {@code IMMUTABLE},
         * {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
         * for details.)
         *
         * @implSpec
         * The default implementation creates a parallel {@code Stream} from the
         * collection's {@code Spliterator}.
         *
         * @return a possibly parallel {@code Stream} over the elements in this
         * collection
         * @since 1.8
         */
    
    以上是该接口的全部注释,这里所谓的隐含逻辑是,并非每一个独立调用parallelStream的代码都会独立维护运行一个多线程的策略,而是JDK默认会调用同一个由运行环境维护的ForkJoinPool线程池,也就是说,无论在哪个地方写了list.parallelStream().forEach();这样一段代码,底层实际都会由一套ForkJoinPool的线程池进行运行,一般线程池运行会遇到的冲突、排队等问题,这里同样会遇到,且会被隐藏在代码逻辑中。
    
    这里最危险的当然就是线程池的deadlock,一旦发生deadlock,所有调用parallelStream的地方都会被阻塞,无论你是否知道其他人是否这样书写了代码。
    

    3.会引起线程池deadlock的场景

    3.1 最常见的线程池内部阻塞

    以这段代码为例
    list.parallelStream().forEach(o -> {
        o.doSomething();
        ...
    });
    
    只要在doSomething()中有任何导致当前执行被hold住的情况,则由于parallelStream完成时会执行join操作,任何一个没有完成迭代都会导致join操作被hold住,进而导致当前线程被卡住。
    典型的操作有:线程被wait,锁,循环锁,外部操作卡住等。
    

    3.2 迭代时对象被修改

    list.parallelStream().forEach()时,如果不甚修改了list对象的长度,则也有可能导致join操作无法完成。
    

    3.3 static代码块中执行迭代

    如果你在一个类的static代码块中写了迭代,并且执行了lambda表达式,则也会导致线程被锁住。
    class A {
        static {
            list.parallelStream().forEach(n -> {
                n.doSomething();
            })
        }
    }
    
    这里的原因是执行lambda表达式的前提是当前类A必须完成类初始化,但初始化又由于static代码块无法执行,而导致程序互锁,最终导致卡住
    

    4.建议

    1、如果你开发的是V5这样的复杂系统,不建议直接使用parallelStream执行多线程操作
    2、如果你真的希望在V5这样的复杂系统中使用parallelStream,请考虑构建独立的ForkJoinPool,使用如下姿势调用:
    ForkJoinPool forkJoinPool1 = new ForkJoinPool(20);
    ForkJoinTask<?> fs = forkJoinPool.submit(() -> list.parallelStream().forEach((n) -> {
            n.doSomething();
        }));
    try {
        result = fs.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e){
        e.printStackTrace();
    }
    forkJoinPool.shutdown();
     
    即手动制定线程池,不过个人建议,这样的写法还不如自己写一般的多线程代码来得简单。
    
    3、当然,如果你开发的微服务等类似的相对简单系统,则可以直接使用parallelStream,因为系统的简单性,相关风险会很低。
    
    

    相关文章

      网友评论

          本文标题:parallelStream和ForkJoinPool的使用风险

          本文链接:https://www.haomeiwen.com/subject/rwnqoltx.html