什么是流?
Stream是java8中新增加的一个特性,被java统称为流.
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
Stream 就如同一个迭代器,单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:
- 1.0 -1.4 java.util.Thread
- 5.0 java.util.concurrent
- 6.0 JUC 等
- 7.0 ForkjoinPool 框架, Phaser等
- 8.0 Lamda
parallelStream是什么
parallelStream其实就是一个并行执行的流.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.
parallelStream的作用
Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作,因此像以下的程式片段:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream() .forEach(out::println);
你得到的展示顺序不一定会是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream() .forEachOrdered(out::println);
注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。
forkjion 与 ParallelStreams
上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream
Java 8为ForkJoinPool添加了一个通用线程池ForkJoinPool.commonPool ,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
比如下面的代码用来遍历列表中的元素并执行需要的操作:
myList.parallelStream
.map(this::retrieveFromA)
.map(this::processUsingB)
.forEach(this::saveToC)
对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用parallelStream明显更胜一筹。
对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量
parallelStream的问题
线程安全
重点说下Parallel Stream并行流使用的一些坑。一个是使用.parallelStream()之后,在接下来的管道中做任何业务逻辑都需要确保线程安全,比如。
List<Int..> result = new ArrayList<>();
tmpList.parallerStream()
.foEach(item -> {
result.add(item);
});
由于ArrayList并不是线程安全的,这样使用就会出现线程安全问题,所以注意了,使用parallerStream必须确保线程安全问题。
可能很多人自从用了stearm之后就很少写for循环了,这不是一个好的习惯。比如只是简单的遍历一遍int数组,那就不要使用stearm,直接使用for循环性能会更好,因为stream你只是用着简单,但你看下源码,封装很多复杂逻辑,原本只是简单的数组遍历,你用stream之后将会创建出很多中间对象,还有一大堆的流式调用逻辑。
线程饥饿
在并发量高的接口中不要直接使用stream的parallerStream处理耗时的逻辑,因为并行流运行时,内部使用的fork-join线程池是整个JVM进程全局唯一的线程池。而这个线程池其默认线程数为处理器核心数。
Runtime.getRuntime().availableProcessors()
可以通过配置修改这个值 。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
一般不建议修改,最好是自己创建一个Fork-Join线程池来用,因为你如果改了,虽然对你当前的业务逻辑来说,算是调优了,但对于项目中其它地方只是用来做非耗时的并行流运算,性能就差了。
我们可以看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样我们就算使用了 Parallel Stream,由于所有使用并行流parallerStream的地方都是使用同一个Fork-Join线程池,而线程池线程数仅为cpu的核心数。我们可以来写个例子验证是不是整个java进程使用的parallerStream都是用的同一个进程.
public class PStream {
public static void main(String[] args) throws InterruptedException {
final List<Integer> list = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
list.add(1);
}
for (int i = 1; i <= 50; i++) {
new Thread("test-" + i) {
String currentThreaName = this.getName();
@Override
public void run() {
list.parallelStream()
.forEach(numbser -> {
Thread c = Thread.currentThread();
System.out.println(currentThreaName + "===> "
+ c.getClass().getName() + ":" + c.getName() + ":" + c.getId());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}.start();
}
假设,分布式服务中,有一个接口,用于批量处理数据,如果每次消费者调用都用了批量处理1000条记录的过滤,假设一条记录的过滤逻辑需要耗时4ms( 涉及到redis缓存的读),如果有40个请求并发过滤,那就是40000条记录交给2个线程去处理(cpu核心线程数),很容易导致服务消费端报错,一堆的接口调用超时异常,导致服务雪崩。后果很严重。原因如下
40个请求开启40个并行流parallerStream,40个并行流parallerStream使用同一个只有2个线程的Fork-Join线程池(2核8g机器),意味着40个请求争抢着执行任务。
假设一条记录的过滤耗时为4ms,在串行的情况下1000条记录应该只是4000ms。但如果是400000条记录争抢2个线程执行。 最坏的情况下,一个请求需要200000*4ms才能执行完成。就会导致接口调用超时。
总之,不要在高并发的接口中使用并行流,直接使用处理请求的线程执行就行,如果有需要,那就全局创建一个Fork-Join线程池自己切分任务来执行。
变通的方法
JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,我们可以看到:
如果合适,安排一个异步执行的任务到当前正在运行的池中。如果任务不在ForkJoinPool()中,也可以调用ForkJoinPool.commonPool()获取新的池来执行。
自定义线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
List<ProxyList> records = new ArrayList<>();
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
.map(ProxyList::getIpPort)
.filter(IProxyListTask::isFailed)
.collect(Collectors.toList())
).join();
整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池之后,执行速度有了明显的提升。以前 5 分钟执行不完的任务现在 2 分钟之内就能全部执行完毕。
但是最关键的是,如果是用了自己定义的forkJoinPool, 还有必要用parallelStream 么?
总结
Parallel streams 是无法预测的,而且想要正确地使用它有些棘手。几乎任何parallel streams的使用都会影响程序中无关部分的性能,而且是一种无法预测的方式。。但是在调用stream.parallel() 或者parallelStream()时候在我的代码里之前我仍然会重新审视一遍他给我的程序究竟会带来什么问题,他能有多大的提升,是否有使用他的意义.
stream or parallelStream?
上面我们也看到了parallelStream所带来的隐患和好处,那么,在从stream和parallelStream方法中进行选择时,我们可以考虑以下几个问题:
- 是否需要并行
在回答这个问题之前,你需要弄清楚你要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。 - 任务之间是否是独立的,没有竞争关系
对于问题2,如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。 - 结果是否取决于调用顺序
由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。
网友评论