美文网首页RxJava
RxJava并发parallel的使用

RxJava并发parallel的使用

作者: 啥也不说了 | 来源:发表于2018-09-28 15:52 被阅读162次

    概述

    本文不描述RxJava是什么,以及如何使用的,重点讨论如何使用RxJava实现并发。即:

    1. 区分线程切换和并发;
    2. 如何使用好parallel实现并发。

    通过一个简化的例子来说明整个过程:

    整数->休眠100ms->字符串

    即把一批正数转化为字符串。

    设备信息

    1. 8核电脑
    2. Java 8
    3. RxJava 2

    线程切换和并发的区别

    刚开始接触RxJava的时候,是因为高并发以及函数式编程。使用的是observeOn()和subscribeOn()来管理线程。这种实际上并不是并发,而是线程切换。


    observeOn的线程切换
    subscribeOn的线程切换

    关于observeOn()和subscribeOn()的区别可以参考RxJava之五—— observeOn()与subscribeOn()的详解

    但是不论是observeOn还是subscribeOn,只是做到了线程切换而不是并发。从图上可以发现,发送的红黄绿等球并不是并发的,而是经过切换之后都在同一个线程上去做处理。所以这种场景比较适合:

    1. 区分计算和IO线程。把计算放到同一个线程上,把IO放到同一个线程上去,不要因为IO浪费计算性能。
    2. IO和计算的时间相差不大。假设系统是IO密集型但是计算很少的系统,那么就不适合。如果IO时间和计算时间差不多,那就有必要区分。
      总之,就是为了把不同的操作放到不同的线程上,就需要切换线程,但是这种并没有解决并发问题。

    代码实现

           LocalDateTime localDateTime = LocalDateTime.now();
            List<String> strings = Flowable.fromIterable(IntStream.range(1, 8).mapToObj(i -> i).collect(Collectors.toList())).subscribeOn(Schedulers.computation()).map(i -> {
                System.out.println("subscribeOn后线程名称:" + Thread.currentThread().getName() + ", 当前值:" + i);
                return i;
            }).observeOn(Schedulers.computation()).map(i -> {
                System.out.println("observeOn后线程名称:" + Thread.currentThread().getName() + ", 当前值:" + i);
                return i;
            }).map(platform -> {
                Thread.sleep(100);
                return "转化的结果为:" + platform;
            }).collect((Callable<List<String>>) () -> new ArrayList<>(8), (strings1, s) -> strings1.add(s)).blockingGet();
            System.out.println(strings);
            System.out.println("一共耗时" + Duration.between(localDateTime, LocalDateTime.now()).toMillis());
    

    输出结果为:

    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:1
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:2
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:1
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:3
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:4
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:5
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:6
    subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:7
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:2
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:3
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:4
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:5
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:6
    observeOn后线程名称:RxComputationThreadPool-1, 当前值:7
    [转化的结果为:1, 转化的结果为:2, 转化的结果为:3, 转化的结果为:4, 转化的结果为:5, 转化的结果为:6, 转化的结果为:7]

    一共耗时967

    发现三个点:

    1. subscribeOn和observeOn之后,线程果然发生了变化。
    2. 切换了线程之后,同一个函数都在同一个线程上执行。
    3. 发现时间大于8*100=800ms,即并没有并发节省时间。

    如何并发

    前面的论述说明了:切换线程不是并发,真正的并发是parallel()函数。首先看同一个需求的并发实现:

            LocalDateTime localDateTime = LocalDateTime.now();
            List<String> strings = Flowable.fromIterable(IntStream.range(1, 8).mapToObj(i -> i).collect(Collectors.toList())).parallel().runOn(Schedulers.computation()).map(platform -> {
                System.out.println("当前线程名称:" + Thread.currentThread().getName() + ", 当前转化值:" + platform);
                Thread.sleep(100);
                return "转化的结果为:" + platform;
            }).sequential().collect((Callable<List<String>>) () -> new ArrayList<>(8), (strings1, s) -> strings1.add(s)).blockingGet();
            System.out.println(strings);
            System.out.println("一共耗时" + Duration.between(localDateTime, LocalDateTime.now()).toMillis());
    

    执行结果为:

    当前线程名称:RxComputationThreadPool-1, 当前转化值:1
    当前线程名称:RxComputationThreadPool-5, 当前转化值:5
    当前线程名称:RxComputationThreadPool-2, 当前转化值:2
    当前线程名称:RxComputationThreadPool-3, 当前转化值:3
    当前线程名称:RxComputationThreadPool-4, 当前转化值:4
    当前线程名称:RxComputationThreadPool-7, 当前转化值:7
    当前线程名称:RxComputationThreadPool-6, 当前转化值:6
    [转化的结果为:3, 转化的结果为:1, 转化的结果为:2, 转化的结果为:4, 转化的结果为:5, 转化的结果为:6, 转化的结果为:7]
    一共耗时254

    通过结果也发现几个特点:

    1. 每个转换都在不同的线程;
    2. 总时间小于8*100=800ms。
      所以通过parallel方法和runOn实现了不同线程上的并发。

    parallel原理说明

    parallel原理图

    图中分为三个轨道,然后把元素轮询分配到不同的轨道上。

    parallel方法说明

    parallel的重构方法有三个,分别是:parallel()、parallel(int parallelism)和parallel(int parallelism,int prefetch)三个。主要区别是轨道数目以及如何元素与轨道的关系。

    parallel()方法

    默认的轨道数是CPU的核数,我的是8核CPU,所以线程默认是8。如果不修改这个数目,开再多的线程也没用。

    parallel(int parallelist)

    自己制定轨道数目,也是自己控制并发数。

    parallel(int parallelis,int prefetch)

    这里的prefetch表示的是缓存大小,类似于bufferSize。具体校验结果待定,后面确认了之后再补。

    the number of items each 'rail' should prefetch——每个轨道预取的元素个数

    特点

    使用parallel的时候,并不是说某个轨道执行速度快,分配到这个轨道上的元素就多,而是会把元素按顺序平均分配到各个轨道上。

    总结

    通过前面介绍,发现如果需要并发就用parallel函数,并发数通过parallel的参数来控制,而不是通过自己创建的线程数。

    相关文章

      网友评论

        本文标题:RxJava并发parallel的使用

        本文链接:https://www.haomeiwen.com/subject/dmcpoftx.html