翻译上面的原文,因为最近正在用Reactor, 所以想知道什么时候用什么,why? 可能会有多篇和实例,这是第一篇
多个Reactive 框架Reactor /Rxjava (1/2), 还有java8的stream, 来比较下,从8个方面
1. Composable
2. Lazy
3. Reuseable
4.Asynchronous
5.Cacheable
6.Pull or Push
7.Backpressure
8.Operator fusion
参与比较的工具有:
CompletableFucture, Stream, Optional, Observable(Rxjava1), Observable(Rxjava2), Flowable(Rxjava2), Flux(Reactor)
1. Composable: 组合性, 能否function编程
CompletableFucture, 有一组then*()方法,可以组成pipeline,传递你nothing或者single value 从stage到stage
Stream,有一组operators来传递N个value
Optional, 只有map,flatMap,filter少数几个中间方法
Observable, Flux, Flowable, 这点跟stream一样
2. Lazy, 是否懒加载,推迟执行
CompletableFuture, 在对象创建的时候被赋予的是已经启动的Async的线程,只能通过它得到结果,不lazy
Stream, 所有中间操作(返回Stream的)都是lazy,所以trigger操作都是不lazy
Optional, 不lazy, 立即发生
Observable, Flux, Flowable, lazy, 直到subscriber出现
3. Reusable, 复用
CompletableFucture, 可以复用,毕竟只是个wrapper对象
Stream,不可以, javadoc:
一个stream只能在(一个中间或终结操作)执行一次,复用会抛出IllegalStateException,但是一些stream操作会返回receiver而不是一个新的stream对象,所以很难发现
Optional, 可以,是immutable, 立即发生
Observable, Flux, Flowable, 可以从设计上复用,都是从开始调用直到subscriber
4. Asynchronous 异步
CompletableFuture, 本身为了Async而出现,代表了一个work,内部通过ForkJoinPool来操作Executor,默认创建cpu core数量的线城池,或者double(超频),可以通过jvm参数: -Djava.util.concurrent.ForkJoinPool.common.parallelism=? 设置数量
Stream,本身不Async, 可以通过parrallelStream来多线程
Optional, 不行
Observable, Flux, Flowable, 基于Observer模式的,肯定是Async, 还能通过Subscirber接口方法onNext onCompleted来控制, 然后subscribeOn 和observeOn 可以控制 subcription和reception ,类似:
返回:
5. Cacheable , 缓存
reuseable和cacheable的区别: 假设pipeline A , re-use 2次, 创建pipeline B = A + [] , C = A + []
如果B 和C 成功,就是可reusable,
如果B 和 C成功, A的每个stage只被调用1次,就是cacheable, 必须是reusable 才能cacheable
CompletableFuture, 跟reusable的一样,
Stream, 不能cache中间结果
Optional, 可以,因为结果已经得到
Observable, Flowable, Flux, 默认不是, 可以.cache()方法来启用
6. Push, Pull , 是push模式还是pull模式
Stream , Optional是pull的,通过collect(), get()等方法,pull模式意味着阻塞,同步
CompletableFuture, Flux, Observable, Flowable, 是push的, 意味着非阻塞,异步
7.Backpressure, 背压(极差的翻译)
主要是push模式才有, 上游的Push数量超过了下游subscriber的处理能力
CompletableFuture, 不需要,只有一个结果
Observable(Rxjava1), Flux, Flowable, 处理策略:
Bufferring, 缓存,来不及处理的放入buffer, onNext()方法(其中Flux默认LONG_MAX 大小)
Drop, 扔
use latest, 用最新的
None, 不管
Exception, 扔异常
Observable(Rxjava2), 不解决这个问题,必须使用Flowable来处理
8. Operator Fusion, 操作符熔断
是一种多中间阶段的代码优化,只有Reactor 和Rxjava2 支持, 举例:
marco-fusion:
marco-fusion不是很清楚更具体从其他文章分析
总结
Stream,CompletableFuture and Optional都是为了解决特别的问题而出现,不用滥用
Rxjava 和Reactor 也同样, 在大并发的情况下,需要测试使用
网友评论