1.parallelStream和ForkJoinPool
parallelStream是JDK官方在1.8版本中增加的语法级新特性,主要的特点就是可以帮助用户在流式开发时快速实现并行编程,从而快速简单的实现多线程运行,同时降低用户对于线程池维护带来的复杂性。
使用parallelStream的典型代码:
list.parallelStream().forEach(o -> {
o.doSomething();
...
});
ForkJoinPool是JDK官方在1.7版本中引入的特定线程池,主要应用于基于递归调用策略的任务流多线程调用场景。
2.风险点
虽然parallelStream的流式编程带来的极大的多线程开发便利性,但同时也带来了一个隐含的逻辑,且并未在接口注释中说明:
/**
* Returns a possibly parallel {@code Stream} with this collection as its
* source. It is allowable for this method to return a sequential stream.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a parallel {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/
以上是该接口的全部注释,这里所谓的隐含逻辑是,并非每一个独立调用parallelStream的代码都会独立维护运行一个多线程的策略,而是JDK默认会调用同一个由运行环境维护的ForkJoinPool线程池,也就是说,无论在哪个地方写了list.parallelStream().forEach();这样一段代码,底层实际都会由一套ForkJoinPool的线程池进行运行,一般线程池运行会遇到的冲突、排队等问题,这里同样会遇到,且会被隐藏在代码逻辑中。
这里最危险的当然就是线程池的deadlock,一旦发生deadlock,所有调用parallelStream的地方都会被阻塞,无论你是否知道其他人是否这样书写了代码。
3.会引起线程池deadlock的场景
3.1 最常见的线程池内部阻塞
以这段代码为例
list.parallelStream().forEach(o -> {
o.doSomething();
...
});
只要在doSomething()中有任何导致当前执行被hold住的情况,则由于parallelStream完成时会执行join操作,任何一个没有完成迭代都会导致join操作被hold住,进而导致当前线程被卡住。
典型的操作有:线程被wait,锁,循环锁,外部操作卡住等。
3.2 迭代时对象被修改
list.parallelStream().forEach()时,如果不甚修改了list对象的长度,则也有可能导致join操作无法完成。
3.3 static代码块中执行迭代
如果你在一个类的static代码块中写了迭代,并且执行了lambda表达式,则也会导致线程被锁住。
class A {
static {
list.parallelStream().forEach(n -> {
n.doSomething();
})
}
}
这里的原因是执行lambda表达式的前提是当前类A必须完成类初始化,但初始化又由于static代码块无法执行,而导致程序互锁,最终导致卡住
4.建议
1、如果你开发的是V5这样的复杂系统,不建议直接使用parallelStream执行多线程操作
2、如果你真的希望在V5这样的复杂系统中使用parallelStream,请考虑构建独立的ForkJoinPool,使用如下姿势调用:
ForkJoinPool forkJoinPool1 = new ForkJoinPool(20);
ForkJoinTask<?> fs = forkJoinPool.submit(() -> list.parallelStream().forEach((n) -> {
n.doSomething();
}));
try {
result = fs.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e){
e.printStackTrace();
}
forkJoinPool.shutdown();
即手动制定线程池,不过个人建议,这样的写法还不如自己写一般的多线程代码来得简单。
3、当然,如果你开发的微服务等类似的相对简单系统,则可以直接使用parallelStream,因为系统的简单性,相关风险会很低。
网友评论