美文网首页
Rxjava 生命周期

Rxjava 生命周期

作者: luowenbin | 来源:发表于2020-03-16 13:56 被阅读0次

Rxjava 的Observable 包含了许多do开头的方法。
这些方法一般不会影响Observable的主流程。
但是这些方法有什么区别和作用呢?
先看一些测试。

测试发射1个数据

Observable.just("data1")
[2020-3-16 12:37:13] doOnSubscribe 
[2020-3-16 12:37:13] doOnLifecycle:OnSubscribe  

[2020-3-16 12:37:13] doOnNext 
[2020-3-16 12:37:13] doOnEach OnNextNotification[data1] 
[2020-3-16 12:37:13] subscribe,OnNext:data1 
[2020-3-16 12:37:13] doAfterNext 

[2020-3-16 12:37:13] doOnEach OnCompleteNotification 
[2020-3-16 12:37:13] doOnComplete 
[2020-3-16 12:37:13] doOnTerminate 
[2020-3-16 12:37:13] subscribe,Complete 
[2020-3-16 12:37:13] doAfterTerminate 
[2020-3-16 12:37:13] doFinally 

测试发射2个数据

Observable.just("data1","data2")
[2020-3-16 12:37:13] doOnSubscribe 
[2020-3-16 12:37:13] doOnLifecycle:OnSubscribe  

[2020-3-16 12:37:13] doOnNext 
[2020-3-16 12:37:13] doOnEach OnNextNotification[data1] 
[2020-3-16 12:37:13] subscribe,OnNext:data1 
[2020-3-16 12:37:13] doAfterNext 

[2020-3-16 12:37:13] doOnNext 
[2020-3-16 12:37:13] doOnEach OnNextNotification[data2] 
[2020-3-16 12:37:13] subscribe,OnNext:data2 
[2020-3-16 12:37:13] doAfterNext 

[2020-3-16 12:37:13] doOnEach OnCompleteNotification 
[2020-3-16 12:37:13] doOnComplete 
[2020-3-16 12:37:13] doOnTerminate 
[2020-3-16 12:37:13] subscribe,Complete 
[2020-3-16 12:37:13] doAfterTerminate 
[2020-3-16 12:37:13] doFinally 

发射1个数据并抛出异常

Observable.create(observableEmitter -> {
            observableEmitter.onNext("data1");
            throw new Exception("error");
        });
[2020-3-16 12:42:50] doOnSubscribe 
[2020-3-16 12:42:50] doOnLifecycle:OnSubscribe  

[2020-3-16 12:42:50] doOnNext 
[2020-3-16 12:42:50] doOnEach OnNextNotification[data1] 
[2020-3-16 12:42:50] subscribe,OnNext:data1 
[2020-3-16 12:42:50] doAfterNext 

[2020-3-16 12:42:50] doOnEach OnErrorNotification[java.lang.Exception: error] 
[2020-3-16 12:42:50] doOnTerminate 
[2020-3-16 12:42:50] doOnError 
[2020-3-16 12:42:50] subscribe,OnError:java.lang.Exception: error 
[2020-3-16 12:42:50] doAfterTerminate 
[2020-3-16 12:42:50] doFinally 

发射1个数据和异常

Observable.create(observableEmitter -> {
            observableEmitter.onNext("data1");
            observableEmitter.onError(new Exception("error"));
        });
[2020-3-16 12:42:50] doOnSubscribe 
[2020-3-16 12:42:50] doOnLifecycle:OnSubscribe  

[2020-3-16 12:42:50] doOnNext 
[2020-3-16 12:42:50] doOnEach OnNextNotification[data1] 
[2020-3-16 12:42:50] subscribe,OnNext:data1 
[2020-3-16 12:42:50] doAfterNext 

[2020-3-16 12:42:50] doOnEach OnErrorNotification[java.lang.Exception: error] 
[2020-3-16 12:42:50] doOnTerminate 
[2020-3-16 12:42:50] doOnError 
[2020-3-16 12:42:50] subscribe,OnError:java.lang.Exception: error 
[2020-3-16 12:42:50] doAfterTerminate 
[2020-3-16 12:42:50] doFinally 

发射1个数据后Dispose

Observable.create(observableEmitter -> {
                observableEmitter.onNext("data1");
                disposable.dispose();
                observableEmitter.onNext("data2");
            })
[2020-4-6 17:42:19] doOnSubscribe 
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe  
[2020-4-6 17:42:19] doOnNext 
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1] 
[2020-4-6 17:42:19] subscribe,OnNext:data1 
[2020-4-6 17:42:19] doAfterNext 
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose 
[2020-4-6 17:42:19] doOnDispose 
[2020-4-6 17:42:19] doFinally 

发射1个数据后Dispose,抛出异常

Observable.<String>create(observableEmitter -> {
                observableEmitter.onNext("data1");
                disposable.dispose();
                throw new Exception("error");
            })
[2020-4-6 17:42:19] doOnSubscribe 
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe  
[2020-4-6 17:42:19] doOnNext 
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1] 
[2020-4-6 17:42:19] subscribe,OnNext:data1 
[2020-4-6 17:42:19] doAfterNext 
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose 
[2020-4-6 17:42:19] doOnDispose 
[2020-4-6 17:42:19] doFinally 
Error:io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: error

发射1个数据后Dispose,发射异常

Observable.<String>create(observableEmitter -> {
                observableEmitter.onNext("data1");
                disposable.dispose();
                observableEmitter.onError(new Exception("error"));
            })
[2020-4-6 17:42:19] doOnSubscribe 
[2020-4-6 17:42:19] doOnLifecycle:OnSubscribe  
[2020-4-6 17:42:19] doOnNext 
[2020-4-6 17:42:19] doOnEach OnNextNotification[data1] 
[2020-4-6 17:42:19] subscribe,OnNext:data1 
[2020-4-6 17:42:19] doAfterNext 
[2020-4-6 17:42:19] doOnLifecycle: doOnDispose 
[2020-4-6 17:42:19] doOnDispose 
[2020-4-6 17:42:19] doFinally 
Error:io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: error

因为已经解除了订阅,因此错误无法传递到下游。追踪rxjava源码,默认是传递到了对于线程的UncaughtExceptionHandler中。
可以在RxJavaPlugins设置拦截自己处理,也可以调用tryOnError或者自行判断是否dispose来选择不传递。

测试代码

private static Disposable doTest(Observable<String> observable) {
        return observable
                .doOnNext(aLong -> echo("doOnNext"))
                .doOnEach(integerNotification -> echo("doOnEach " + integerNotification))
                .doOnComplete(() -> echo("doOnComplete"))
                .doFinally(() -> echo("doFinally"))
                .doOnTerminate(() -> echo("doOnTerminate"))
                .doOnDispose(() -> echo("doOnDispose"))
                .doOnSubscribe(disposable -> echo("doOnSubscribe"))
                .doOnLifecycle(disposable -> echo("doOnLifecycle:OnSubscribe "), () -> echo("doOnLifecycle: doOnDispose"))
                .doAfterNext(integer -> echo("doAfterNext"))
                .doAfterTerminate(() -> echo("doAfterTerminate"))
                .doOnError(throwable -> echo("doOnError"))

                .doOnSubscribe(disposable1 -> disposable=disposable1)//获取subscribe之前的disposable

                .subscribe(integer -> echo("subscribe,OnNext:" + integer),
                        throwable -> echo("subscribe,OnError:" + throwable),
                        () -> echo("subscribe,Complete"));
    }

可以得出几个有用的结论:

首先先说明:提交≠执行,后面解释。

1.提交的顺序

doOnTerminatedoAfterTerminate
这两方法,分别是在subscribe 前后提交的。

doOnNextdoAfterNext
这两方法,分别是在subscribe 前后提交的。

doFinally 总是最后提交的,并且是一定提交的,无论是data,error,或者dispose
因此可以进行一些释放资源,或者关闭加载对话框的工作。

2.doXxx()的顺序不同实际上有区别吗?

答案是有区别的:

推荐阅读官方文档中upstream-upstream

比如:

observable
                .doOnNext(aLong -> echo("doOnNext1"))
                .doOnNext(aLong -> echo("doOnNext2"))
                .doOnNext(aLong -> echo("doOnNext3"))
                .doOnNext(aLong -> echo("doOnNext4"))
                .doOnNext(s -> {throw new Exception("Test Error");})
                .doOnNext(aLong -> echo("doOnNext5"))
                .doOnNext(aLong -> echo("doOnNext6"))
                .doOnNext(aLong -> echo("doOnNext7"))

抛出异常以后的 downstream 以及 downstream 的 downstream 都没办法执行了。
因此doOnNext5,doOnNext6,doOnNext7是不会执行的。

在看可能比较多人会遇到的问题:
为什么有时候的doFinally,doOnTerminate,doOnDispose ... 不执行?
可以观察在上面测试代码中的这一句代码。

.doOnSubscribe(disposable1 -> disposable=disposable1)

假如放在了doFinally前面,那么解除订阅的时候,在doFinally的upstream 就已经断开了,因此doFinally 是不会执行的。

所以doOnSubscribe在不同位置获取到的disposable的作用也是不一样的,一般情况下,获取disposable的doOnSubscribe紧接着subscribe就行了。


一些其他结论:

1.使用同名字doXxx()方法多次

这些方法如doOnNext()等可以使用多次,不会覆盖掉之前的。

2.不同的doXxx()方法的实际执行顺序

上面测试没有切换线程,因此得出来的提交顺序恰好是执行顺序。
但是实际执行是取决于Scheduler的。

当使用了Scheduler的时候,先提交的,也有可能会后执行。
也就是说,doFinally,也有可能不是最后执行。

因此,一般这些方法可以起到 “顺便执行” 的作用。
比如注册流程是:

->调用注册
->调用登录
-->顺便保存登录的密码到本地
->跳转到主页面

有顺序要求的,可以使用flatMap,或者map。

相关文章

网友评论

      本文标题:Rxjava 生命周期

      本文链接:https://www.haomeiwen.com/subject/eqqgehtx.html