Rxjava解除订阅三部曲:
前言
最近在维护老旧网络库的时候,发现网络库底层运用到了Rxjava,而最近凑巧又给app加上了leakcanary检测内存泄漏,发现除了网络库的Rxjava泄漏之外,还有些业务上滥用的Rxjava也存在泄漏的情况。有问题咱就得想办法解决,这个老旧的网络库有点年岁了,写的是真不咋样,奈何app里还有大量引用,该维护的还是得维护。Rxjava作为近几年非常流行的一个三方库,功能就不用多说了,谁用谁知道。
正文
Rxjava是好用,可用不好很容易造成内存泄漏。而且理论上Rxjava的每个操作符都可能会造成内存泄漏。举个例子,我们用Rx进行网络请求,然后订阅在主线程进行ui更新。网络请求是在分线程执行,而且有延迟。当请求没有返回时,我们将这个页面关闭了,后续分线程数据回来执行ui更新,而Rx还持有外部类的引用,这就造成了内存泄漏。
解决的办法Rx本身就给提供,这就是我们要讲的第一种解除订阅的方法:
- Rxjava提供的
Dispose.dispose();
1.Dispose.dispose()
这是Rxjava本身提供的一种接触订阅的方式,使用很简单,在页面关闭的时候,或者在需要的时候调用下dispose()
方法就可以了。
如果最后的订阅者是Consumer
,那么会有一个返回值Dispose
。那么在需要的时候,就可以调用Dispose.dispose()
。但往往我们使用Rxjava的时候,都需要对正常返回和异常返回做些通用处理,使用的往往是Observer
,这样做的结果就是没有返回值。
其实Observer
也已经给我们准备好了解除订阅的方式,我们不妨看下Observer
的源码:
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
有个方法onSubscribe(@NonNull Disposable d)
,有一个Dispose
的参数,那么我们就可以接收这个变量,在在需要的时候调用Dispose.dispose()
。
这个方法onSubscribe
意味着当subscribe
方法被调用之前,就会拿到Dispose
句柄,此时Rxjava任何一个相关的操作符处理都还未执行,调用dispose()
方法后,完成解除订阅。
扩展: CompositeDisposable
上面讲的是针对单个Dispose进行订阅解除,可往往实际使用中,我们可希望看到一堆Dispose的成员变量在页面销毁的时候扎堆解除订阅。这时候就需要CompositeDisposable
,简单的理解,就是可以对Dispose进行批量的处理,类似于List集合,其内部实现方法也很类似,包括add,addAll,delete,remove,clear,dispose,isDisposed
。
方法使用都很简单,我们把Dispose1,Dispose2,Dispose3使用add
方法,添加到CompositeDisposable
中,在页面销毁时调用dispose
进行批量解除。
这里对dispose
和clear方法进行单独说明下,dispose
执行后,会改变CompositeDisposable
的状态为disposed,即已完成订阅解除状态,而clear
则只会批量解除订阅,不会改变整个CompositeDisposable
的disposed状态。我们看下源码就知道了:
dispose方法:
而clear方法:
clear
差异就在红箭头那边。
而其他的方法都要去判断disposed状态,已经disposed的直接return,不会继续执行。
2.RxLifeCycle
接下来开始,就是比较骚的操作了。RxLifeCycle,顾名思义就是对Rxjava生命周期管理,也就是意味着,Rxjava长大,已经学会了自己该何时进行解除订阅。github直达链接。
虽然没有中文文档,但摸索起来也不困难,首先添加核心依赖:
implementation 'com.trello.rxlifecycle3:rxlifecycle:3.1.0'
implementation 'com.trello.rxlifecycle3:rxlifecycle-components:3.1.0'
RxLifeCycle使用需要继承RxAppCompatActivity
,Fragment也需要继承RxFragment
,当然还有一些其他扩展,比如RxDialogFragment
等,大家自己去体验吧。
使用起来也是很简单,直接看代码吧:
Observable.just(1)
.compose(this.<Integer>bindToLifecycle())
.subscribe();
或者:
Observable.just(1)
.compose(this.<Integer>bindUntilEvent(ActivityEvent.DESTROY))
.subscribe();
核心使用就这两个方法,这俩方法也是有些许的区别。
bindToLifecycle():自动识别在合适的生命周期内解除绑定。
bindUntilEvent(ActivityEvent):在指定的生命周期内解除绑定。
对于bindUntilEvent(ActivityEvent)
很容易理解,指定一个生命周期解除绑定,但对于bindToLifecycle()
如何自动识别生命周期有些疑问,我们不妨写个demo测试下效果如何:
private void test() {
subscribe = Observable.interval(0, 2, TimeUnit.SECONDS)
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
Log.d(TAG, "当前发射数值:" + aLong);
return aLong;
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "当前接收数值:" + aLong);
}
});
}
每个2s发射一个数值,无限的发,调用bindToLifecycle()
,然后我们在onCreate
方法调用,最终打印效果:
D/zdu_Rxdemo: onCreate,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:0
D/zdu_Rxdemo: 当前接收数值:0
D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:1
D/zdu_Rxdemo: 当前接收数值:1
D/zdu_Rxdemo: 当前发射数值:2
D/zdu_Rxdemo: 当前接收数值:2
D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:3
D/zdu_Rxdemo: 当前接收数值:3
D/zdu_Rxdemo: onStop,subscribe.isDisposed():false
D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true
在onDestroy
方法中自动解除订阅了,而代码中并没有主动去调用dispose方法,可见自动解除订阅生效了。
那如果在onStart
方法订阅的话,解除订阅的生命周期又不一样了:
D/zdu_Rxdemo: onStart,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:0
D/zdu_Rxdemo: 当前接收数值:0
D/zdu_Rxdemo: onResume,subscribe.isDisposed():false
D/zdu_Rxdemo: 当前发射数值:1
D/zdu_Rxdemo: 当前接收数值:1
D/zdu_Rxdemo: 当前发射数值:2
D/zdu_Rxdemo: 当前接收数值:2
D/zdu_Rxdemo: onPause,subscribe.isDisposed():false
D/zdu_Rxdemo: onStop,subscribe.isDisposed():true
D/zdu_Rxdemo: onDestroy,subscribe.isDisposed():true
在onStop
生命周期内就被解除订阅了,那我们在onResume
中订阅的话,是不是就会在onPause
中解除订阅了呢?事实上确实是这样的,日志就不打印了,我们直接去看源码实现。
刚刚我们调用的是RxAppCompatActivity
的两个方法:
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(this.lifecycleSubject, event);
}
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(this.lifecycleSubject);
}
暂且现不管this.lifecycleSubject
是什么,我们继续向下看源码:
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
@Nonnull final R event) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(event, "event == null");
return bind(takeUntilEvent(lifecycle, event));
}
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
bindUntilEvent
就很明确了,底层用了filter
过滤操作符,过滤了非指定生命周期。但生命周期是怎么下发下来的呢?最后再做解释。
public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
}
bindToLifecycle
的底层源码最终跟takeUntilEvent
的底层源码一致,都指向了
RxLifecycle.bind
方法。我们就看下bind
方法到底是执行了什么?
先看bindUntilEvent
:
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
new了一个LifecycleTransformer
,在看下内部实现:
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer
{
final Observable<?> observable;
LifecycleTransformer(Observable<?> observable) {
checkNotNull(observable, "observable == null");
this.observable = observable;
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
}
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.takeUntil(observable.firstOrError());
}
@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
return upstream.takeUntil(observable.firstElement());
}
@Override
public CompletableSource apply(Completable upstream) {
return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
}
}
原来是一个实现ObservableTransformer
等接口的类,到这里也明白了为什么RxLifeCycle要用compose
操作符,并且其内部实现使用了takeUntil
操作符,在符合条件后,打断上游链。这也说明了,我们要在订阅前一刻执行这个自动解除订阅打断上游链,而对下游链没有作用。
再看下bindToLifecycle
的bind
实现,与takeUntilEvent
稍微有点不同的是它要自动判断生命周期:
@NonNull
@CheckResult
public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
}
而ACTIVITY_LIFECYCLE
则是一个switch取值:
private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE = new Function<ActivityEvent, ActivityEvent>() {
public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
switch(lastEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
case DESTROY:
throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
default:
throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
}
}
};
这就印证了我们之前的demo,在create的时候返回是ActivityEvent.DESTORY,对应START返回的就是STOP生命周期等等。
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle, @Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
这个takeUntilCorrespondingEvent
就是生命周期判断,内部使用了combineLatest
操作符,简单的说就是该操作符接收多个Observable以及一个函数作为参数,并且函数的签名为这些Observable发射的数据类型。当以上的任意一个Observable发射数据之后,会去取其它Observable 最近一次发射的数据,回调到函数当中,但是该函数回调的前提是所有的Observable都至少发射过一个数据项。
看不懂上面的没有关系,takeUntilCorrespondingEvent
的作用就是筛选过滤生命周期,那么问题又来了,这个生命周期到底是哪发射来的呢?
其实是用了 BehaviorSubject
。在特定条件下,Subject
既可以发送事件,也可以接收事件,而BehaviorSubject
接收到订阅前的最后一条数据和订阅后的所有数据。BehaviorSubject
在Activity的每个生命周期都发射了一个生命周期事件:
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
this.lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@CallSuper
protected void onStart() {
super.onStart();
this.lifecycleSubject.onNext(ActivityEvent.START);
}
......
其他生命周期也类似,不再贴出来了。如此,完整的RxLifeCycle源码执行分析到此为止。
结语
RxLifeCycle做为自动解除绑定的一个三方库,源码实现比较简单易读,一定程度上可以帮助我们解决Rxjava内存泄漏的问题,但不可否认的说,它也有弊端:
-
基类需要继承RxAppCompatActivity和RxFragment等,这也是最大的弊端。这如今我们的Activity都已经封装好了一个完整的基类,但要用RxLifeCycle,又需要换成这个,限制太大。
-
对应MVP框架结构来说,无法在P层使用,只能在V层才能使用,不符合MVP结构。
当然也有更好的解决方式,那就是下一篇要讲的 Rxjava解除订阅②:AutoDispose
网友评论