一、disposable用法
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableConsumeExample() {
Disposable disposable = model.getRxJavaFlowableRealExample()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
1、在activity的生命周期结束前清除compositeDisposable即可
public void onFinishActivity() {
compositeDisposable.dispose();
}
二、注意事项
1、 compositeDisposable.dispose(); 此处只是把观察者和被观察者之前的通道切断,观察者不能接收数据,而被观察者则会继续发送数据到结束为止。
2、如上例所示,如果在无限for循环中不添加
if (emitter.isCancelled()) {
break;
}
这样的语句,则被观察者会无限发送数据。
日志
备注:(关闭acitivity情况下,等于已经切断通道,可以requested会保持某个值,一直打印数据)
08-31 14:14:15.203 29916-30066/com.example.zhang D/MainModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87145
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87146
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87147
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87148
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
三、总结
1、被观察者如果是无限发送数据,或者有线程阻塞的情况下,要手动控制
类似
if (emitter.isCancelled()) {
break;
}
河马过河微信公众号.jpg
网友评论