概述
本文不描述RxJava是什么,以及如何使用的,重点讨论如何使用RxJava实现并发。即:
- 区分线程切换和并发;
- 如何使用好parallel实现并发。
通过一个简化的例子来说明整个过程:
整数->休眠100ms->字符串
即把一批正数转化为字符串。
设备信息
- 8核电脑
- Java 8
- RxJava 2
线程切换和并发的区别
刚开始接触RxJava的时候,是因为高并发以及函数式编程。使用的是observeOn()和subscribeOn()来管理线程。这种实际上并不是并发,而是线程切换。
observeOn的线程切换
subscribeOn的线程切换
关于observeOn()和subscribeOn()的区别可以参考RxJava之五—— observeOn()与subscribeOn()的详解
但是不论是observeOn还是subscribeOn,只是做到了线程切换而不是并发。从图上可以发现,发送的红黄绿等球并不是并发的,而是经过切换之后都在同一个线程上去做处理。所以这种场景比较适合:
- 区分计算和IO线程。把计算放到同一个线程上,把IO放到同一个线程上去,不要因为IO浪费计算性能。
- IO和计算的时间相差不大。假设系统是IO密集型但是计算很少的系统,那么就不适合。如果IO时间和计算时间差不多,那就有必要区分。
总之,就是为了把不同的操作放到不同的线程上,就需要切换线程,但是这种并没有解决并发问题。
代码实现
LocalDateTime localDateTime = LocalDateTime.now();
List<String> strings = Flowable.fromIterable(IntStream.range(1, 8).mapToObj(i -> i).collect(Collectors.toList())).subscribeOn(Schedulers.computation()).map(i -> {
System.out.println("subscribeOn后线程名称:" + Thread.currentThread().getName() + ", 当前值:" + i);
return i;
}).observeOn(Schedulers.computation()).map(i -> {
System.out.println("observeOn后线程名称:" + Thread.currentThread().getName() + ", 当前值:" + i);
return i;
}).map(platform -> {
Thread.sleep(100);
return "转化的结果为:" + platform;
}).collect((Callable<List<String>>) () -> new ArrayList<>(8), (strings1, s) -> strings1.add(s)).blockingGet();
System.out.println(strings);
System.out.println("一共耗时" + Duration.between(localDateTime, LocalDateTime.now()).toMillis());
输出结果为:
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:1
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:2
observeOn后线程名称:RxComputationThreadPool-1, 当前值:1
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:3
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:4
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:5
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:6
subscribeOn后线程名称:RxComputationThreadPool-2, 当前值:7
observeOn后线程名称:RxComputationThreadPool-1, 当前值:2
observeOn后线程名称:RxComputationThreadPool-1, 当前值:3
observeOn后线程名称:RxComputationThreadPool-1, 当前值:4
observeOn后线程名称:RxComputationThreadPool-1, 当前值:5
observeOn后线程名称:RxComputationThreadPool-1, 当前值:6
observeOn后线程名称:RxComputationThreadPool-1, 当前值:7
[转化的结果为:1, 转化的结果为:2, 转化的结果为:3, 转化的结果为:4, 转化的结果为:5, 转化的结果为:6, 转化的结果为:7]
一共耗时967
发现三个点:
- subscribeOn和observeOn之后,线程果然发生了变化。
- 切换了线程之后,同一个函数都在同一个线程上执行。
- 发现时间大于8*100=800ms,即并没有并发节省时间。
如何并发
前面的论述说明了:切换线程不是并发,真正的并发是parallel()函数。首先看同一个需求的并发实现:
LocalDateTime localDateTime = LocalDateTime.now();
List<String> strings = Flowable.fromIterable(IntStream.range(1, 8).mapToObj(i -> i).collect(Collectors.toList())).parallel().runOn(Schedulers.computation()).map(platform -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ", 当前转化值:" + platform);
Thread.sleep(100);
return "转化的结果为:" + platform;
}).sequential().collect((Callable<List<String>>) () -> new ArrayList<>(8), (strings1, s) -> strings1.add(s)).blockingGet();
System.out.println(strings);
System.out.println("一共耗时" + Duration.between(localDateTime, LocalDateTime.now()).toMillis());
执行结果为:
当前线程名称:RxComputationThreadPool-1, 当前转化值:1
当前线程名称:RxComputationThreadPool-5, 当前转化值:5
当前线程名称:RxComputationThreadPool-2, 当前转化值:2
当前线程名称:RxComputationThreadPool-3, 当前转化值:3
当前线程名称:RxComputationThreadPool-4, 当前转化值:4
当前线程名称:RxComputationThreadPool-7, 当前转化值:7
当前线程名称:RxComputationThreadPool-6, 当前转化值:6
[转化的结果为:3, 转化的结果为:1, 转化的结果为:2, 转化的结果为:4, 转化的结果为:5, 转化的结果为:6, 转化的结果为:7]
一共耗时254
通过结果也发现几个特点:
- 每个转换都在不同的线程;
- 总时间小于8*100=800ms。
所以通过parallel方法和runOn实现了不同线程上的并发。
parallel原理说明
parallel原理图图中分为三个轨道,然后把元素轮询分配到不同的轨道上。
parallel方法说明
parallel的重构方法有三个,分别是:parallel()、parallel(int parallelism)和parallel(int parallelism,int prefetch)三个。主要区别是轨道数目以及如何元素与轨道的关系。
parallel()方法
默认的轨道数是CPU的核数,我的是8核CPU,所以线程默认是8。如果不修改这个数目,开再多的线程也没用。
parallel(int parallelist)
自己制定轨道数目,也是自己控制并发数。
parallel(int parallelis,int prefetch)
这里的prefetch表示的是缓存大小,类似于bufferSize。具体校验结果待定,后面确认了之后再补。
the number of items each 'rail' should prefetch——每个轨道预取的元素个数
特点
使用parallel的时候,并不是说某个轨道执行速度快,分配到这个轨道上的元素就多,而是会把元素按顺序平均分配到各个轨道上。
总结
通过前面介绍,发现如果需要并发就用parallel函数,并发数通过parallel的参数来控制,而不是通过自己创建的线程数。
网友评论