Rxjava 的Observable
包含了许多do
开头的方法。
这些方法一般不会影响Observable的主流程。
但是这些方法有什么区别和作用呢?
先看一些测试。
测试发射1个数据
Observable.just("data1")
[2020-3-16 12:37:13] doOnSubscribe
[2020-3-16 12:37:13] doOnLifecycle:OnSubscribe
[2020-3-16 12:37:13] doOnNext
[2020-3-16 12:37:13] doOnEach OnNextNotification[data1]
[2020-3-16 12:37:13] subscribe,OnNext:data1
[2020-3-16 12:37:13] doAfterNext
[2020-3-16 12:37:13] doOnEach OnCompleteNotification
[2020-3-16 12:37:13] doOnComplete
[2020-3-16 12:37:13] doOnTerminate
[2020-3-16 12:37:13] subscribe,Complete
[2020-3-16 12:37:13] doAfterTerminate
[2020-3-16 12:37:13] doFinally
测试发射2个数据
Observable.just("data1","data2")
[2020-3-16 12:37:13] doOnSubscribe
[2020-3-16 12:37:13] doOnLifecycle:OnSubscribe
[2020-3-16 12:37:13] doOnNext
[2020-3-16 12:37:13] doOnEach OnNextNotification[data1]
[2020-3-16 12:37:13] subscribe,OnNext:data1
[2020-3-16 12:37:13] doAfterNext
[2020-3-16 12:37:13] doOnNext
[2020-3-16 12:37:13] doOnEach OnNextNotification[data2]
[2020-3-16 12:37:13] subscribe,OnNext:data2
[2020-3-16 12:37:13] doAfterNext
[2020-3-16 12:37:13] doOnEach OnCompleteNotification
[2020-3-16 12:37:13] doOnComplete
[2020-3-16 12:37:13] doOnTerminate
[2020-3-16 12:37:13] subscribe,Complete
[2020-3-16 12:37:13] doAfterTerminate
[2020-3-16 12:37:13] doFinally
发射1个数据并抛出异常
Observable.create(observableEmitter -> {
observableEmitter.onNext("data1");
throw new Exception("error");
});
[2020-3-16 12:42:50] doOnSubscribe
[2020-3-16 12:42:50] doOnLifecycle:OnSubscribe
[2020-3-16 12:42:50] doOnNext
[2020-3-16 12:42:50] doOnEach OnNextNotification[data1]
[2020-3-16 12:42:50] subscribe,OnNext:data1
[2020-3-16 12:42:50] doAfterNext
[2020-3-16 12:42:50] doOnEach OnErrorNotification[java.lang.Exception: error]
[2020-3-16 12:42:50] doOnTerminate
[2020-3-16 12:42:50] doOnError
[2020-3-16 12:42:50] subscribe,OnError:java.lang.Exception: error
[2020-3-16 12:42:50] doAfterTerminate
[2020-3-16 12:42:50] doFinally
发射1个数据和异常
Observable.create(observableEmitter -> {
observableEmitter.onNext("data1");
observableEmitter.onError(new Exception("error"));
});
[2020-3-16 12:42:50] doOnSubscribe
[2020-3-16 12:42:50] doOnLifecycle:OnSubscribe
[2020-3-16 12:42:50] doOnNext
[2020-3-16 12:42:50] doOnEach OnNextNotification[data1]
[2020-3-16 12:42:50] subscribe,OnNext:data1
[2020-3-16 12:42:50] doAfterNext
[2020-3-16 12:42:50] doOnEach OnErrorNotification[java.lang.Exception: error]
[2020-3-16 12:42:50] doOnTerminate
[2020-3-16 12:42:50] doOnError
[2020-3-16 12:42:50] subscribe,OnError:java.lang.Exception: error
[2020-3-16 12:42:50] doAfterTerminate
[2020-3-16 12:42:50] doFinally
发射1个数据后Dispose
Observable.create(observableEmitter -> {
observableEmitter.onNext("data1");
disposable.dispose();
observableEmitter.onNext("data2");
})
[2020-4-6 17:42:19] doOnSubscribe
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe
[2020-4-6 17:42:19] doOnNext
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1]
[2020-4-6 17:42:19] subscribe,OnNext:data1
[2020-4-6 17:42:19] doAfterNext
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose
[2020-4-6 17:42:19] doOnDispose
[2020-4-6 17:42:19] doFinally
发射1个数据后Dispose,抛出异常
Observable.<String>create(observableEmitter -> {
observableEmitter.onNext("data1");
disposable.dispose();
throw new Exception("error");
})
[2020-4-6 17:42:19] doOnSubscribe
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe
[2020-4-6 17:42:19] doOnNext
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1]
[2020-4-6 17:42:19] subscribe,OnNext:data1
[2020-4-6 17:42:19] doAfterNext
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose
[2020-4-6 17:42:19] doOnDispose
[2020-4-6 17:42:19] doFinally
Error:io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: error
发射1个数据后Dispose,发射异常
Observable.<String>create(observableEmitter -> {
observableEmitter.onNext("data1");
disposable.dispose();
observableEmitter.onError(new Exception("error"));
})
[2020-4-6 17:42:19] doOnSubscribe
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe
[2020-4-6 17:42:19] doOnNext
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1]
[2020-4-6 17:42:19] subscribe,OnNext:data1
[2020-4-6 17:42:19] doAfterNext
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose
[2020-4-6 17:42:19] doOnDispose
[2020-4-6 17:42:19] doFinally
Error:io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: error
因为已经解除了订阅,因此错误无法传递到下游。追踪rxjava源码,默认是传递到了对于线程的UncaughtExceptionHandler中。
可以在RxJavaPlugins设置拦截自己处理,也可以调用tryOnError或者自行判断是否dispose来选择不传递。
测试代码
private static Disposable doTest(Observable<String> observable) {
return observable
.doOnNext(aLong -> echo("doOnNext"))
.doOnEach(integerNotification -> echo("doOnEach " + integerNotification))
.doOnComplete(() -> echo("doOnComplete"))
.doFinally(() -> echo("doFinally"))
.doOnTerminate(() -> echo("doOnTerminate"))
.doOnDispose(() -> echo("doOnDispose"))
.doOnSubscribe(disposable -> echo("doOnSubscribe"))
.doOnLifecycle(disposable -> echo("doOnLifecycle:OnSubscribe "), () -> echo("doOnLifecycle: doOnDispose"))
.doAfterNext(integer -> echo("doAfterNext"))
.doAfterTerminate(() -> echo("doAfterTerminate"))
.doOnError(throwable -> echo("doOnError"))
.doOnSubscribe(disposable1 -> disposable=disposable1)//获取subscribe之前的disposable
.subscribe(integer -> echo("subscribe,OnNext:" + integer),
throwable -> echo("subscribe,OnError:" + throwable),
() -> echo("subscribe,Complete"));
}
可以得出几个有用的结论:
首先先说明:提交≠执行,后面解释。
1.提交的顺序
doOnTerminate 和 doAfterTerminate
这两方法,分别是在subscribe 前后提交的。
doOnNext 和 doAfterNext
这两方法,分别是在subscribe 前后提交的。
doFinally 总是最后提交的,并且是一定提交的,无论是data
,error
,或者dispose
。
因此可以进行一些释放资源,或者关闭加载对话框的工作。
2.doXxx()
的顺序不同实际上有区别吗?
答案是有区别的:
推荐阅读官方文档中upstream-upstream
比如:
observable
.doOnNext(aLong -> echo("doOnNext1"))
.doOnNext(aLong -> echo("doOnNext2"))
.doOnNext(aLong -> echo("doOnNext3"))
.doOnNext(aLong -> echo("doOnNext4"))
.doOnNext(s -> {throw new Exception("Test Error");})
.doOnNext(aLong -> echo("doOnNext5"))
.doOnNext(aLong -> echo("doOnNext6"))
.doOnNext(aLong -> echo("doOnNext7"))
抛出异常以后的 downstream 以及 downstream 的 downstream 都没办法执行了。
因此doOnNext5,doOnNext6,doOnNext7是不会执行的。
在看可能比较多人会遇到的问题:
为什么有时候的doFinally,doOnTerminate,doOnDispose ... 不执行?
可以观察在上面测试代码中的这一句代码。
.doOnSubscribe(disposable1 -> disposable=disposable1)
假如放在了doFinally前面,那么解除订阅的时候,在doFinally的upstream 就已经断开了,因此doFinally 是不会执行的。
所以doOnSubscribe在不同位置获取到的disposable的作用也是不一样的,一般情况下,获取disposable的doOnSubscribe紧接着subscribe就行了。
一些其他结论:
1.使用同名字doXxx()
方法多次
这些方法如doOnNext()
等可以使用多次,不会覆盖掉之前的。
2.不同的doXxx()
方法的实际执行顺序
上面测试没有切换线程,因此得出来的提交顺序恰好是执行顺序。
但是实际执行是取决于Scheduler
的。
当使用了Scheduler
的时候,先提交的,也有可能会后执行。
也就是说,doFinally,也有可能不是最后执行。
因此,一般这些方法可以起到 “顺便执行” 的作用。
比如注册流程是:
->调用注册
->调用登录
-->顺便保存登录的密码到本地
->跳转到主页面
有顺序要求的,可以使用flatMap,或者map。
网友评论