美文网首页Android-RxJava
Rxjava解除订阅①:自带方式和RxLifeCycle

Rxjava解除订阅①:自带方式和RxLifeCycle

作者: 海阔sun天空 | 来源:发表于2020-05-04 21:25 被阅读0次

    Rxjava解除订阅三部曲:

    前言

    最近在维护老旧网络库的时候,发现网络库底层运用到了Rxjava,而最近凑巧又给app加上了leakcanary检测内存泄漏,发现除了网络库的Rxjava泄漏之外,还有些业务上滥用的Rxjava也存在泄漏的情况。有问题咱就得想办法解决,这个老旧的网络库有点年岁了,写的是真不咋样,奈何app里还有大量引用,该维护的还是得维护。Rxjava作为近几年非常流行的一个三方库,功能就不用多说了,谁用谁知道。

    正文

    Rxjava是好用,可用不好很容易造成内存泄漏。而且理论上Rxjava的每个操作符都可能会造成内存泄漏。举个例子,我们用Rx进行网络请求,然后订阅在主线程进行ui更新。网络请求是在分线程执行,而且有延迟。当请求没有返回时,我们将这个页面关闭了,后续分线程数据回来执行ui更新,而Rx还持有外部类的引用,这就造成了内存泄漏。
    解决的办法Rx本身就给提供,这就是我们要讲的第一种解除订阅的方法:

    • Rxjava提供的Dispose.dispose();

    1.Dispose.dispose()

    这是Rxjava本身提供的一种接触订阅的方式,使用很简单,在页面关闭的时候,或者在需要的时候调用下dispose()方法就可以了。
    如果最后的订阅者是Consumer,那么会有一个返回值Dispose。那么在需要的时候,就可以调用Dispose.dispose()。但往往我们使用Rxjava的时候,都需要对正常返回和异常返回做些通用处理,使用的往往是Observer,这样做的结果就是没有返回值。
    其实Observer也已经给我们准备好了解除订阅的方式,我们不妨看下Observer的源码:

    public interface Observer<T> {
        void onSubscribe(@NonNull Disposable d);
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
    

    有个方法onSubscribe(@NonNull Disposable d),有一个Dispose的参数,那么我们就可以接收这个变量,在在需要的时候调用Dispose.dispose()
    这个方法onSubscribe意味着当subscribe方法被调用之前,就会拿到Dispose句柄,此时Rxjava任何一个相关的操作符处理都还未执行,调用dispose()方法后,完成解除订阅。

    扩展: CompositeDisposable

    上面讲的是针对单个Dispose进行订阅解除,可往往实际使用中,我们可希望看到一堆Dispose的成员变量在页面销毁的时候扎堆解除订阅。这时候就需要CompositeDisposable,简单的理解,就是可以对Dispose进行批量的处理,类似于List集合,其内部实现方法也很类似,包括add,addAll,delete,remove,clear,dispose,isDisposed
    方法使用都很简单,我们把Dispose1,Dispose2,Dispose3使用add方法,添加到CompositeDisposable中,在页面销毁时调用dispose进行批量解除。
    这里对dispose和clear方法进行单独说明下,dispose执行后,会改变CompositeDisposable的状态为disposed,即已完成订阅解除状态,而clear则只会批量解除订阅,不会改变整个CompositeDisposabledisposed状态。我们看下源码就知道了:
    dispose方法:

    dispose
    而clear方法:
    clear
    差异就在红箭头那边。
    而其他的方法都要去判断disposed状态,已经disposed的直接return,不会继续执行。

    2.RxLifeCycle

    接下来开始,就是比较骚的操作了。RxLifeCycle,顾名思义就是对Rxjava生命周期管理,也就是意味着,Rxjava长大,已经学会了自己该何时进行解除订阅。github直达链接

    虽然没有中文文档,但摸索起来也不困难,首先添加核心依赖:

    implementation 'com.trello.rxlifecycle3:rxlifecycle:3.1.0'
    implementation 'com.trello.rxlifecycle3:rxlifecycle-components:3.1.0'
    

    RxLifeCycle使用需要继承RxAppCompatActivity,Fragment也需要继承RxFragment,当然还有一些其他扩展,比如RxDialogFragment等,大家自己去体验吧。
    使用起来也是很简单,直接看代码吧:

            Observable.just(1)
                    .compose(this.<Integer>bindToLifecycle())
                    .subscribe();
    

    或者:

            Observable.just(1)
                    .compose(this.<Integer>bindUntilEvent(ActivityEvent.DESTROY))
                    .subscribe();
    

    核心使用就这两个方法,这俩方法也是有些许的区别。

    bindToLifecycle():自动识别在合适的生命周期内解除绑定。

    bindUntilEvent(ActivityEvent):在指定的生命周期内解除绑定。

    对于bindUntilEvent(ActivityEvent)很容易理解,指定一个生命周期解除绑定,但对于bindToLifecycle()如何自动识别生命周期有些疑问,我们不妨写个demo测试下效果如何:

        private void test() {
            subscribe = Observable.interval(0, 2, TimeUnit.SECONDS)
                    .map(new Function<Long, Long>() {
                        @Override
                        public Long apply(Long aLong) throws Exception {
                            Log.d(TAG, "当前发射数值:" + aLong);
                            return aLong;
                        }
                    })
                    .compose(this.<Long>bindToLifecycle())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.d(TAG, "当前接收数值:" + aLong);
                        }
                    });
        }
    

    每个2s发射一个数值,无限的发,调用bindToLifecycle(),然后我们在onCreate方法调用,最终打印效果:

    D/zdu_Rxdemo: onCreate,subscribe.isDisposed():false
    D/zdu_Rxdemo: 当前发射数值:0
    D/zdu_Rxdemo: 当前接收数值:0
    D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
    D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
    D/zdu_Rxdemo: 当前发射数值:1
    D/zdu_Rxdemo: 当前接收数值:1
    D/zdu_Rxdemo: 当前发射数值:2
    D/zdu_Rxdemo: 当前接收数值:2
    D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
    D/zdu_Rxdemo: 当前发射数值:3
    D/zdu_Rxdemo: 当前接收数值:3
    D/zdu_Rxdemo: onStop,subscribe.isDisposed():false
    D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true
    

    onDestroy方法中自动解除订阅了,而代码中并没有主动去调用dispose方法,可见自动解除订阅生效了。

    那如果在onStart方法订阅的话,解除订阅的生命周期又不一样了:

    D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
    D/zdu_Rxdemo: 当前发射数值:0
    D/zdu_Rxdemo: 当前接收数值:0
    D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
    D/zdu_Rxdemo: 当前发射数值:1
    D/zdu_Rxdemo: 当前接收数值:1
    D/zdu_Rxdemo: 当前发射数值:2
    D/zdu_Rxdemo: 当前接收数值:2
    D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
    D/zdu_Rxdemo: onStop,subscribe.isDisposed():true
    D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true
    

    onStop生命周期内就被解除订阅了,那我们在onResume中订阅的话,是不是就会在onPause中解除订阅了呢?事实上确实是这样的,日志就不打印了,我们直接去看源码实现。

    刚刚我们调用的是RxAppCompatActivity的两个方法:

        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
            return RxLifecycle.bindUntilEvent(this.lifecycleSubject, event);
        }
    
        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindToLifecycle() {
            return RxLifecycleAndroid.bindActivity(this.lifecycleSubject);
        }
    

    暂且现不管this.lifecycleSubject是什么,我们继续向下看源码:

        public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                                    @Nonnull final R event) {
            checkNotNull(lifecycle, "lifecycle == null");
            checkNotNull(event, "event == null");
            return bind(takeUntilEvent(lifecycle, event));
        }
    
        private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
            return lifecycle.filter(new Predicate<R>() {
                @Override
                public boolean test(R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(event);
                }
            });
        }
    

    bindUntilEvent就很明确了,底层用了filter过滤操作符,过滤了非指定生命周期。但生命周期是怎么下发下来的呢?最后再做解释。

    public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
            return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
        }
    

    bindToLifecycle的底层源码最终跟takeUntilEvent的底层源码一致,都指向了
    RxLifecycle.bind方法。我们就看下bind方法到底是执行了什么?

    先看bindUntilEvent

    public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
            return new LifecycleTransformer<>(lifecycle);
        }
    

    new了一个LifecycleTransformer,在看下内部实现:

    public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
                                                          FlowableTransformer<T, T>,
                                                          SingleTransformer<T, T>,
                                                          MaybeTransformer<T, T>,
                                                          CompletableTransformer
    {
        final Observable<?> observable;
    
        LifecycleTransformer(Observable<?> observable) {
            checkNotNull(observable, "observable == null");
            this.observable = observable;
        }
    
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.takeUntil(observable);
        }
    
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
        }
    
        @Override
        public SingleSource<T> apply(Single<T> upstream) {
            return upstream.takeUntil(observable.firstOrError());
        }
    
        @Override
        public MaybeSource<T> apply(Maybe<T> upstream) {
            return upstream.takeUntil(observable.firstElement());
        }
    
        @Override
        public CompletableSource apply(Completable upstream) {
            return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
        }
    }
    

    原来是一个实现ObservableTransformer等接口的类,到这里也明白了为什么RxLifeCycle要用compose操作符,并且其内部实现使用了takeUntil操作符,在符合条件后,打断上游链。这也说明了,我们要在订阅前一刻执行这个自动解除订阅打断上游链,而对下游链没有作用。

    再看下bindToLifecyclebind实现,与takeUntilEvent稍微有点不同的是它要自动判断生命周期:

    @NonNull
        @CheckResult
        public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
            return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
        }
    

    ACTIVITY_LIFECYCLE则是一个switch取值:

    private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE = new Function<ActivityEvent, ActivityEvent>() {
            public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
                switch(lastEvent) {
                case CREATE:
                    return ActivityEvent.DESTROY;
                case START:
                    return ActivityEvent.STOP;
                case RESUME:
                    return ActivityEvent.PAUSE;
                case PAUSE:
                    return ActivityEvent.STOP;
                case STOP:
                    return ActivityEvent.DESTROY;
                case DESTROY:
                    throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
                default:
                    throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
                }
            }
        };
    

    这就印证了我们之前的demo,在create的时候返回是ActivityEvent.DESTORY,对应START返回的就是STOP生命周期等等。

    public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,  @Nonnull final Function<R, R> correspondingEvents) {
            checkNotNull(lifecycle, "lifecycle == null");
            checkNotNull(correspondingEvents, "correspondingEvents == null");
            return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
        }
    
        private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                           final Function<R, R> correspondingEvents) {
            return Observable.combineLatest(
                lifecycle.take(1).map(correspondingEvents),
                lifecycle.skip(1),
                new BiFunction<R, R, Boolean>() {
                    @Override
                    public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                        return lifecycleEvent.equals(bindUntilEvent);
                    }
                })
                .onErrorReturn(Functions.RESUME_FUNCTION)
                .filter(Functions.SHOULD_COMPLETE);
        }
    

    这个takeUntilCorrespondingEvent就是生命周期判断,内部使用了combineLatest操作符,简单的说就是该操作符接收多个Observable以及一个函数作为参数,并且函数的签名为这些Observable发射的数据类型。当以上的任意一个Observable发射数据之后,会去取其它Observable 最近一次发射的数据,回调到函数当中,但是该函数回调的前提是所有的Observable都至少发射过一个数据项。

    看不懂上面的没有关系,takeUntilCorrespondingEvent的作用就是筛选过滤生命周期,那么问题又来了,这个生命周期到底是哪发射来的呢?

    其实是用了 BehaviorSubject。在特定条件下,Subject既可以发送事件,也可以接收事件,而BehaviorSubject接收到订阅前的最后一条数据和订阅后的所有数据。BehaviorSubject在Activity的每个生命周期都发射了一个生命周期事件:

        @CallSuper
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            this.lifecycleSubject.onNext(ActivityEvent.CREATE);
        }
    
        @CallSuper
        protected void onStart() {
            super.onStart();
            this.lifecycleSubject.onNext(ActivityEvent.START);
        }
    ......
    

    其他生命周期也类似,不再贴出来了。如此,完整的RxLifeCycle源码执行分析到此为止。

    结语

    RxLifeCycle做为自动解除绑定的一个三方库,源码实现比较简单易读,一定程度上可以帮助我们解决Rxjava内存泄漏的问题,但不可否认的说,它也有弊端:

    • 基类需要继承RxAppCompatActivity和RxFragment等,这也是最大的弊端。这如今我们的Activity都已经封装好了一个完整的基类,但要用RxLifeCycle,又需要换成这个,限制太大。

    • 对应MVP框架结构来说,无法在P层使用,只能在V层才能使用,不符合MVP结构。
      当然也有更好的解决方式,那就是下一篇要讲的 Rxjava解除订阅②:AutoDispose

    相关文章

      网友评论

        本文标题:Rxjava解除订阅①:自带方式和RxLifeCycle

        本文链接:https://www.haomeiwen.com/subject/xmdyvhtx.html