在RxJava2中使用背压非常简单,由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
而在RxJava2.0 中,Observable 不再支持背压,而是改用Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题而新增的(抽象)类。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。
上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:
BackpressureStrategy.BUFFER
BackpressureStrategy.DROP
BackpressureStrategy.LATEST
onBackpressureBuffer:是不丢弃数据的处理方式。
把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。
onBackpressureDrop和onBackpressureLatest比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别。
onBackpressureDrop :直接丢弃数据,不缓存任何数据;
onBackpressureLatest :则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。可以结合下面两幅图来理解。
如果增加订阅:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(Integer value) {
//可以处理返回的值
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
网友评论