RxLifecycle详细解析

作者: Ruheng | 来源:发表于2018-07-08 10:29 被阅读105次

    一、介绍

    RxLifecycle目的:解决RxJava使用中的内存泄漏问题。

    例如,当使用RxJava订阅并执行耗时任务后,当Activityfinish时,如果耗时任务还未完成,没有及时取消订阅,就会导致Activity无法被回收,从而引发内存泄漏。

    为了解决这个问题,就产生了RxLifecycle,让RxJava变得有生命周期感知,使得其能及时取消订阅,避免出现内存泄漏问题。

    二、使用

    首先来介绍下RxLifecycle的使用。

    1.添加依赖

      implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
      implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.1'
      implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
    

    2.继承容器类

    Activity/Fragment需要继承RxAppCompatActivity/RxFragment,主要支持如下几种容器类:


    只需要在项目中针对base类的容器中继承实现对应的Rx类即可,这一步主要是对生命周期的回调事件进行监听。

    3.绑定容器生命周期

    Activity为例,主要有如下两种方法:

    bindUntilEvent(@NonNull ActivityEvent event)
    
    bindToLifecycle()
    

    针对Fragment也有同样的两种方法,只是方法名会有所不同。

    下面详细介绍这两种方法的区别:

    bindUntilEvent

    该方法指定在哪个生命周期方法调用时取消订阅。

    其中ActivityEvent是一个枚举类,对应于Activity的生命周期。

    public enum ActivityEvent {
    
        CREATE,
        START,
        RESUME,
        PAUSE,
        STOP,
        DESTROY
    
    }
    

    具体使用示例:

    override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            Observable.interval(1, TimeUnit.SECONDS)
                    .doOnDispose {
                        Log.i(TAG, "Unsubscribing subscription from onDestory()")
                    }
                    .compose(bindUntilEvent(ActivityEvent.DESTROY))
                    .subscribe {
                        Log.i(TAG, "Started in onCreate(), running until in onDestroy(): $it")
                    }
        }
    

    指定在生命周期onDestory()时,取消订阅。

    bindToLifecycle

    在某个生命周期进行绑定,在对应的生命周期进行订阅解除。

    具体使用示例:

    override fun onResume() {
            super.onResume()
            Observable.interval(1, TimeUnit.SECONDS)
                    .doOnDispose {
                        Log.i(TAG, "Unsubscribing subscription from onPause()")
                    }
                    .compose(bindToLifecycle())
                    .subscribe {
                        Log.i(TAG, "Started in onResume(), running until in onPause(): $it")
                    }
        }
    

    onResume()进行绑定订阅,则在onPause()进行解除订阅,生命周期是两两对应的。

    三、原理解析

    1.compose

    首先来了解一下compose操作符。

    compose(bindToLifecycle())
    compose(bindUntilEvent(ActivityEvent.DESTROY))
    

    如上所示,两种绑定生命周期的方式,都是通过compose操作符进行实现的。

    compose一般情况下可以配合Transformer使用,以实现将一种类型的Observable转换成另一种类型的Observable,保证调用的链式结构。

    那么接下来看该操作符在RxLifecycle中的应用,从bindToLifecyclebindUntilEvent入手。

    2.BehaviorSubject

    public abstract class RxAppCompatActivity extends AppCompatActivity implements LifecycleProvider<ActivityEvent> {
    
        private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
    
        @Override
        @NonNull
        @CheckResult
        public final Observable<ActivityEvent> lifecycle() {
            return lifecycleSubject.hide();
        }
    
        @Override
        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
            return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
        }
    
        @Override
        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindToLifecycle() {
            return RxLifecycleAndroid.bindActivity(lifecycleSubject);
        }
    
        @Override
        @CallSuper
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            lifecycleSubject.onNext(ActivityEvent.CREATE);
        }
    
        @Override
        @CallSuper
        protected void onStart() {
            super.onStart();
            lifecycleSubject.onNext(ActivityEvent.START);
        }
    
        @Override
        @CallSuper
        protected void onResume() {
            super.onResume();
            lifecycleSubject.onNext(ActivityEvent.RESUME);
        }
    
        @Override
        @CallSuper
        protected void onPause() {
            lifecycleSubject.onNext(ActivityEvent.PAUSE);
            super.onPause();
        }
    
        @Override
        @CallSuper
        protected void onStop() {
            lifecycleSubject.onNext(ActivityEvent.STOP);
            super.onStop();
        }
    
        @Override
        @CallSuper
        protected void onDestroy() {
            lifecycleSubject.onNext(ActivityEvent.DESTROY);
            super.onDestroy();
        }
    }
    

    RxAppCompatActivity中有一个关键对象BehaviorSubject

    BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。如下图:

    所以lifecycleSubject会根据绑定订阅的时期,不断发送接下来的生命周期事件ActivityEvent

    3.LifecycleTransformer

    接下来继续看源码,bindToLifecyclebindUntilEvent都返回了一个LifecycleTransformer对象,那么LifecycleTransformer到底有什么用?

    @ParametersAreNonnullByDefault
    public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
                                                          FlowableTransformer<T, T>,
                                                          SingleTransformer<T, T>,
                                                          MaybeTransformer<T, T>,
                                                          CompletableTransformer
    {
        final Observable<?> observable;
    
        LifecycleTransformer(Observable<?> observable) {
            checkNotNull(observable, "observable == null");
            this.observable = observable;
        }
    
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.takeUntil(observable);
        }
    
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
        }
    
        @Override
        public SingleSource<T> apply(Single<T> upstream) {
            return upstream.takeUntil(observable.firstOrError());
        }
    
        @Override
        public MaybeSource<T> apply(Maybe<T> upstream) {
            return upstream.takeUntil(observable.firstElement());
        }
    
        @Override
        public CompletableSource apply(Completable upstream) {
            return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) { return true; }
            if (o == null || getClass() != o.getClass()) { return false; }
    
            LifecycleTransformer<?> that = (LifecycleTransformer<?>) o;
    
            return observable.equals(that.observable);
        }
    
        @Override
        public int hashCode() {
            return observable.hashCode();
        }
    
        @Override
        public String toString() {
            return "LifecycleTransformer{" +
                "observable=" + observable +
                '}';
        }
    }
    

    LifecycleTransformer实现了各种Transformer接口,能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe对象。正好配合上文的compose操作符,使用在链式调用中。

    4.takeUntil

    接下来到了关键了,LifecycleTransformer到底把原来的Observable对象转换成了什么样子?

    这就需要了解takeUntil操作符了!


    当第二个Observable发射了一项数据或者终止时,丢弃原Observable发射的任何数据。所谓的第二个Observable,即传入takeUntil中的Observable对象。

    理解了该操作符的作用,那么你可能就明白了,RxLifecycle就是通过监听第二个Observable发射的数据,来解除订阅。

    那么这第二个Observable是谁?

    不就是在创建LifecycleTransformer的时候传入构造函数中的嘛,那就来寻找一下什么时候创建的该对象即可。

    从头开始捋一捋:

    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
            return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
        }
    

    该方法返回了LifecycleTransformer对象,继续向下追溯。

    public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                                @Nonnull final R event) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(event, "event == null");
        return bind(takeUntilEvent(lifecycle, event));
    }
    
    private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
        return lifecycle.filter(new Predicate<R>() {
            @Override
            public boolean test(R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(event);
            }
        });
    }
    

    继续追踪,马上接近真相。

    public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
        return new LifecycleTransformer<>(lifecycle);
    }
    

    在该方法中创建了该对象,并传入了一个Observable对象,通过上面方法即可知道该对象就是BehaviorSubject对象。

    那么该对象在什么时候发送第一次数据呢?

    这就要看上面的takeUntilEvent方法了。

    关键在这一句lifecycleEvent.equals(event),只有当BehaviorSubject发送的ActivityEvent的值等于解除绑定的生命周期时,才会发送第一次数据。那么当发送第一次数据时,根据上面的分析就会解除订阅的绑定。

    那么针对bindToLifecycle方法,是进行怎样的操作,使得在对应的生命周期进行解除订阅呢?

    还是继续看源码。

    public final <T> LifecycleTransformer<T> bindToLifecycle() {
            return RxLifecycleAndroid.bindActivity(lifecycleSubject);
        }
    
    public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
        return bind(lifecycle, ACTIVITY_LIFECYCLE);
    }
    

    其中ACTIVITY_LIFECYCLE为:

    private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
        new Function<ActivityEvent, ActivityEvent>() {
            @Override
            public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
                switch (lastEvent) {
                    case CREATE:
                        return ActivityEvent.DESTROY;
                    case START:
                        return ActivityEvent.STOP;
                    case RESUME:
                        return ActivityEvent.PAUSE;
                    case PAUSE:
                        return ActivityEvent.STOP;
                    case STOP:
                        return ActivityEvent.DESTROY;
                    case DESTROY:
                        throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
                    default:
                        throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
                }
            }
        };
    

    该函数的功能是会根据传入的生命周期事件,返回对应的生命周期,如CREATEDESTROY。看来通过该函数就可以实现在对应生命周期解绑了。

    不过还需要一系列操作符的协助,继续看源码。

    public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                          @Nonnull final Function<R, R> correspondingEvents) {
            checkNotNull(lifecycle, "lifecycle == null");
            checkNotNull(correspondingEvents, "correspondingEvents == null");
            return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
        }
    
        private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                           final Function<R, R> correspondingEvents) {
            return Observable.combineLatest(
                lifecycle.take(1).map(correspondingEvents),
                lifecycle.skip(1),
                new BiFunction<R, R, Boolean>() {
                    @Override
                    public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                        return lifecycleEvent.equals(bindUntilEvent);
                    }
                })
                .onErrorReturn(Functions.RESUME_FUNCTION)
                .filter(Functions.SHOULD_COMPLETE);
        }
    

    详细看一下takeUntilCorrespondingEvent方法。

    5.take

    首先看一下take操作符,很简单。

    take(int)用一个整数n作为一个参数,只发射前面的n项,如下图:

    那么对应lifecycle.take(1).map(correspondingEvents),即获取发送的第一个生命周期事件,再通过上面对应的函数,转换为响应的生命周期。如果在onCreate中进行绑定,那么第一个发送的就是CREATE,返回的就是对应的DESTORY

    6.skip

    skip(int)忽略Observable发射的前n项数据

    lifecycle.skip(1),如果在onCreate中进行绑定,那么剩余的就是STARTRESUMEPAUSESTOPDESTROY

    7. combineLatest

    最后还需要一个关键的操作符combineLatest,来完成对应生命周期的解除订阅。

    combineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:

    • 所有的Observable都发射过数据。
    • 满足上面条件的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。

    具体示例,如下图所示:

    按照第三个参数的函数,将lifecycle.take(1).map(correspondingEvents)lifecycle.skip(1),进行combine

    new BiFunction<R, R, Boolean>() {
                    @Override
                    public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                        return lifecycleEvent.equals(bindUntilEvent);
                    }
                }
    

    那么结果是

    false,false,false,false,true
    

    之后的onErrorReturnfilter是对异常的处理和判断是否应该结束订阅:

    //异常处理
    static final Function<Throwable, Boolean> RESUME_FUNCTION = new Function<Throwable, Boolean>() {
            @Override
            public Boolean apply(Throwable throwable) throws Exception {
                if (throwable instanceof OutsideLifecycleException) {
                    return true;
                }
    
                //noinspection ThrowableResultOfMethodCallIgnored
                Exceptions.propagate(throwable);
                return false;
            }
        };
        //是否应该取消订阅,依赖于上游的boolean
        static final Predicate<Boolean> SHOULD_COMPLETE = new Predicate<Boolean>() {
            @Override
            public boolean test(Boolean shouldComplete) throws Exception {
                return shouldComplete;
            }
        };
    

    所以,按照上面的例子,如果在onCreate()方法中进行绑定,那么在onDestory()方法中就会对应的解除订阅。

    四、总结

    通过上面的分析,可以了解RxLifecycle的使用以及原理。

    学习RxLifecycle的过程中,更加体会到了对于观察者模式的使用,以及RxJava操作符的强大,各种操作符帮我们实现一些列的转换。

    相关文章

      网友评论

        本文标题:RxLifecycle详细解析

        本文链接:https://www.haomeiwen.com/subject/cttouftx.html