前言
我们在上面的篇幅讲解了Rxjava的操作符的使用,那么这篇文章我们将讲解Rxjava的背压模式。
在Rxjava1.0的时候还没有背压模式,当我们被观察者大量发送事件,远远大于观察者处理事件的速度的时候,会造成内存溢出。这时候背压模式就产生了。
背压模式的代码实现
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 200; i++) {
e.onNext(i);
}
e.onComplete();
}
},
BackpressureStrategy.ERROR
// BackpressureStrategy.BUFFER
// BackpressureStrategy.DROP
// BackpressureStrategy.LATEST
).
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Integer integer) {
try {
Thread.currentThread().sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
这里有一个BackpressureStrategy参数,一共有四种MODE。
BackpressureStrategy.ERROR
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会抛出异常。
BackpressureStrategy.BUFFER
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会等待下游处理。
BackpressureStrategy.DROP
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会丢弃多余事件。
BackpressureStrategy.LATEST
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,只会存储128个事件。
这里需要注意的是,缓存池大小就是128。
我们运行上述代码,会发现程序直接报错了(onError: create: could not emit value due to lack of requests)。因为我们采用了BackpressureStrategy.ERROR模式,并且发送的事件大于128。
我们将发送的事件改为100,然后在被观察者的onSubscribe方法中请求了三次,这时就会打印三次。结果如下图:
onNext: 0
onNext: 1
onNext: 2
然后我们在请求数据的时候新建一个线程:
s.request(3);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
Log.d(TAG,"Thread request");
} catch (InterruptedException e) {
e.printStackTrace();
}
s.request(3);
}
}).start();
onNext: 0
onNext: 1
onNext: 2
Thread request
onNext: 3
onNext: 4
onNext: 5
这时候从结果可以看出,当我们采用异步线程时,我们可以在线程中请求数据,并且可以多次请求。
注意:异步线程切换时,被观察者的线程只会切换一次,而观察者的线程每次都会切换。
总结:
Flowable就是Observable的升级版,用法大致相同,他的主要不同点有以下几点:
1.Flowable在create方法中多了一个BackpressureStrategy参数,一共有四种MODE。
2.Observable有一个切断被观察者和观察者dispose方法,而在Flowable方法中这是采用request方法请求被观察者的事件。
网友评论