美文网首页
ParallelStream的那些坑

ParallelStream的那些坑

作者: 易霂 | 来源:发表于2020-10-25 23:56 被阅读0次

很多同学喜欢使用lambda表达式,它允许你定义短小精悍的函数,体现你高超的编码水平。当然,这个功能在某些以代码行数来衡量工作量的公司来说,就比较吃亏一些。

比如下面的代码片段,让人阅读的时候就像是读诗一样。但是一旦用不好,也是会要命的。

List <integer style="box-sizing: border-box;">transactionsIds = 
widgets.stream() 
             .filter(b -> b.getColor() == RED) 
             .sorted((x,y) -> x.getWeight() - y.getWeight()) 
             .mapToInt(Widget::getWeight) 
             .sum();

这段代码有一个关键的函数,那就是stream。通过它,可以将一个普通的list,转化为流,然后就可以使用类似于管道的方式对list进行操作。总之,用过的都说好。

对这些函数还不是太熟悉?可以参考:《到处是map、flatMap,啥意思?》

问题来了

假如我们把stream换成parallelStream,会发生什么情况?

根据字面上的意思,流会从串行 变成并行。

既然是并行,那用屁股想一想,就知道这里面肯定会有线程安全问题。不过我们这里讨论的并不是要你使用线程安全的集合,这个话题太低级。现阶段,知道在线程不安全的环境中使用线程安全的集合,已经是一个基本的技能。

这次踩坑的地方,是并行流的性能问题。

我们用代码来说话。

下面的代码,开启了8个线程,这8个线程都在使用并行流进行数据计算。在执行的逻辑中,我们让每个任务都sleep 1秒钟,这样就能够模拟一些I/O请求的耗时等待。

使用stream,程序会在30秒后返回,但我们期望程序能够在1秒多返回,因为它是并行流,得对得起这个称号。

测试发现,我们等了好久,任务才执行完毕。

static void paralleTest() { 
    List <integer style="box-sizing: border-box;">numbers = Arrays.asList( 
            0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
            10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
            20, 21, 22, 23, 24, 25, 26, 27, 28, 29 
    ); 
    final long begin = System.currentTimeMillis(); 
    numbers.parallelStream().map(k -> { 
        try { 
            Thread.sleep(1000); 
            System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        return k; 
    }).collect(Collectors.toList()); 
} 

public static void main(String[] args) { 
//    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
    new Thread(() -> paralleTest()).start(); 
}

实际上,在不同的机器上执行,这段代码花费的时间都不一样。

既然是并行,那肯定得有个并行度。太低了,体现不到并行的能能力;太大了,又浪费了上下文切换的时间。我是很沮丧的发现,很多高级研发,将线程池的各种参数背的滚瓜烂熟,各种调优,竟然敢睁一只眼闭一只眼的在I/O密集型业务中用上parallelStream。

要了解这个并行度,我们需要查看具体的构造方法。在ForkJoinPool类中找到这样的代码。

try {  // ignore exceptions in accessing/parsing properties 
    String pp = System.getProperty 
        ("java.util.concurrent.ForkJoinPool.common.parallelism"); 
    if (pp != null) 
        parallelism = Integer.parseInt(pp); 
    fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 
        "java.util.concurrent.ForkJoinPool.common.threadFactory"); 
    handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 
        "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 
} catch (Exception ignore) { 
} 

if (fac == null) { 
    if (System.getSecurityManager() == null) 
        fac = defaultForkJoinWorkerThreadFactory; 
    else // use security-managed default 
        fac = new InnocuousForkJoinWorkerThreadFactory(); 
} 
if (parallelism < 0 && // default 1 less than #cores 
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 
    parallelism = 1; 
if (parallelism > MAX_CAP) 
    parallelism = MAX_CAP; 

可以看到,并行度到底是多少,是由下面的参数来控制的。如果无法获取这个参数,则默认使用 CPU个数-1 的并行度。

可以看到,这个函数是为了计算密集型业务去设计的。如果你喂给它一大堆任务,它就会由并行执行退变成类似于串行的效果。

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N 

即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N设置了一个初始值大小,它依然有问题。

因为,parallelism这个变量是final的,一旦设定,不允许修改。也就是说,上面的参数只会生效一次。

张三可能使用下面的代码,设置了并行度大小为20。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); 

李四可能用同样的方式,设置了这个值为30。那实际在项目中用的是哪个值,那就得问JVM是怎么加载的类信息了。

这种方式并不太非常靠谱。

一种解决方式

我们可以通过提供外置的forkjoinpool,也就是改变提交方式,来实现不同类型的任务分离。

代码如下所示,通过显式的代码提交,即可实现任务分离。

ForkJoinPool pool = new ForkJoinPool(30); 

final long begin = System.currentTimeMillis(); 
try { 
    pool.submit(() -> 
            numbers.parallelStream().map(k -> { 
                try { 
                    Thread.sleep(1000); 
                    System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
                return k; 
            }).collect(Collectors.toList())).get(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    e.printStackTrace(); 
} 

这样,不同的场景,就可以拥有不同的并行度。这种方式和CountDownLatch有异曲同工之妙,我们需要手动管理资源。

使用了这种方式,代码量增加,已经和优雅关系不大了,不仅不优雅,而且丑的要命。白天鹅变成了丑小鸭,你还会爱它么?

相关文章

  • ParallelStream的那些坑

    很多同学喜欢使用lambda表达式,它允许你定义短小精悍的函数,体现你高超的编码水平。当然,这个功能在某些以代码行...

  • parallelStream遇到的坑

    Java parallelStream遇到的坑 线程安全问题 paralleStream是并行流,并行就意味着是多...

  • 自定义parallelStream的thread pool

    自定义parallelStream的thread pool 简介 之前我们讲到parallelStream的底层使...

  • parallelStream

    转自:深入浅出parallelStream 1. 什么是流? Stream是java8中新增加的一个特性,被jav...

  • java8的ParallelStream踩坑记录

    java8中的新特性stream流处理,让集合操作变得非常的简单,但是因为没有源码支持,所以里面有很多坑,只有踩过...

  • 关于parallelStream的一次踩坑

    Stream 是JAVA8引入的一个新的抽象,为了提高性能同时也加入了parallelStream(并行流)看下面...

  • JDK8下parallelStream()踩坑

    近日在开发过程中,想利用Java提供发流进行数据集合操作,数据量大约是在小几十万左右,本想采用并行流去进行操作,把...

  • java8中Collection新增方法详解

    Collection新增方法: removeIf stream parallelStream spliterato...

  • 屏幕适配的那些坑

    屏幕适配的那些坑 屏幕适配的那些坑

  • Stream

    创建Stream 从 Collection stream() parallelStream()举例: 数组 Arr...

网友评论

      本文标题:ParallelStream的那些坑

      本文链接:https://www.haomeiwen.com/subject/ickgmktx.html