【RxJava】- RxLifecycle解决RxJava内存泄

作者: 拔萝卜占坑 | 来源:发表于2020-03-31 16:52 被阅读0次

目录

【RxJava】- 创建操作符源码分析
【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析
【RxJava】- 连接操作符源码分析

简介

在Activity中使用RxJava的时候,由于回调,RxJava持有Activity引用。当Activity销毁时,RxJava中的耗时任务还没有完成,如果这时候没有收到调用对应的dispose()方法,那么RxJava中持有的资源得不到释放,从而引起Activity的内存泄露。如果在Activity中手动调用,这样麻烦又不优雅,所以这时候可以使用RxLifecycle来解决。

使用

具体使用可以参考RxLifecycle,下面注意讲解RxLifecycle源码,带你一步步了解RxLifecycle实现的真相。

流程

分析之前,先用RxJava创建一个观察者模型任务。

Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();

这里只分析bindToLifecycle()方法中的内容,其它都是RxJava中的操作符。

跟踪到

public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
    return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
}
  • ACTIVITY_LIFECYCLE:Function实例,对Activity生命周期时间做映射。
  • lifecycle:是新创建的BehaviorSubject实例。

下一步

public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                  @Nonnull final Function<R, R> correspondingEvents) {
    ...
    return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}

takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents)方法构建一个新的Observable实例,即ObservableFilter对象。·

private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                   final Function<R, R> correspondingEvents) {
    return Observable.combineLatest(
        lifecycle.take(1).map(correspondingEvents),
        lifecycle.skip(1),
        new BiFunction<R, R, Boolean>() {
            @Override
            public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(bindUntilEvent);
            }
        })
        .onErrorReturn(Functions.RESUME_FUNCTION)
        .filter(Functions.SHOULD_COMPLETE);
}

参数lifecycle.share()

public final Observable<T> share() {return publish().refCount();}
public Observable<T> refCount() {
    return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(onRefCount()));
}
    private ConnectableObservable<T> onRefCount() {
        if (this instanceof ObservablePublishClassic) {
            return RxJavaPlugins.onAssembly(
                    new ObservablePublishAlt<T>(((ObservablePublishClassic<T>)this).publishSource())
                   );
        }
        return this;
    }

最后调用bind(@Nonnull final Observable<R> lifecycle)返回一个持有上面创建的Observable实例的LifecycleTransformer对象。

  • subscribe()
    订阅

发射Activity生命周期事件

发射Activity生命周期事件,封装RxLifecycle中的RxAppCompatActivity类里面(我继承的是RxAppCompatActivity)。比如onCreate:

protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    this.lifecycleSubject.onNext(ActivityEvent.CREATE);
}

onNext方法

public void onNext(T t) {
    ...
    Object o = NotificationLite.next(t);
    setCurrent(o);
    for (BehaviorDisposable<T> bs : subscribers.get()) {
        bs.emitNext(o, index);
    }
}

subscribers中的值是一个BehaviorDisposable数组,在subscribeActual方法中进行添加。

protected void subscribeActual(Observer<? super T> observer) {
    BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
    if (add(bs)) {...};
    ...
}

emitNext方法中调用 test(value)来发射数据,如果已经有数据处于发射中,者将数据保存起来,然后返回。

public boolean test(Object o) {
    return cancelled || NotificationLite.accept(o, downstream);
}
public static <T> boolean accept(Object o, Observer<? super T> observer) {
    if (o == COMPLETE) {
        observer.onComplete();
        return true;
    } elseif (o instanceof ErrorNotification) {
        observer.onError(((ErrorNotification)o).e);
        return true;
    }
    observer.onNext((T)o);
    return false;
}

调用

Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();
  • .subscribe()
    执行.compose中返回实例的subscribeActual(Observer<? super T> observer)方法,observer是.subscribe()传入的订阅实例,即观察者。

  • .compose(bindToLifecycle())

    bindToLifecycle()返回的是LifecycleTransformer实例。看一下compose(bindToLifecycle())实现。

    public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
    return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
    }
    
    public static <T> Observable<T> wrap(ObservableSource<T> source) {
      ObjectHelper.requireNonNull(source, "source is null");
      if (source instanceof Observable) {
          return RxJavaPlugins.onAssembly((Observable<T>)source);
      }
      return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
    }
    

    首先调用LifecycleTransformer的apply方法,传入被观察者对象,即.create返回的实例。
    apply方法做了什么

    public ObservableSource<T> apply(Observable<T> upstream) {
       return upstream.takeUntil(observable);
    }
    

    对被观察者使用takeUntil操作符,传入值为takeUntilCorrespondingEvent返回的ObservableFilter实例,新创建一个ObservableTakeUntil实例并返回。

    由于ObservableTakeUntil是Observable的子类,所以.compose返回ObservableTakeUntil对象。

  • ObservableTakeUntil
    接下来就执行到ObservableTakeUntil中的subscribeActual(Observer<? super T> child)方法。

    ObservableTakeUntil中的两个参数:

    • other
      takeUntilCorrespondingEvent返回的ObservableFilter实例
    • source
      被观察者,即.create返回的实例。
    public void subscribeActual(Observer<? super T> child) {
      TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child);
      child.onSubscribe(parent);
      other.subscribe(parent.otherObserver);
      source.subscribe(parent);
    }
    

    child是观察者实例,即.subscribe()中的订阅者。

    调用观察者的onSubscribe方法,传入持有观察者实例TakeUntilMainObserver实例。

    调用ObservableFilter的subscribe方法,调用被观察者(即.create返回的实例)中的subscribe方法。前者传入OtherObserver实例,后者传入TakeUntilMainObserver实例。

    看一下takeUntilCorrespondingEvent方法:

    private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                     final Function<R, R> correspondingEvents) {
      return Observable.combineLatest(
          lifecycle.take(1).map(correspondingEvents),
          lifecycle.skip(1),
          new BiFunction<R, R, Boolean>() {
              @Override
              public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                  return lifecycleEvent.equals(bindUntilEvent);
              }
          })
          .onErrorReturn(Functions.RESUME_FUNCTION)
          .filter(Functions.SHOULD_COMPLETE);
    }
    

    这里面的逻辑什么触发呢,当在Activity生命周期发射事件时,也就是上面讲的RxAppCompatActivity生命周期发射事件。

    .filter返回的是ObservableFilter对象。而ObservableFilter中的downstream是ObservableTakeUntil中的OtherObserver实例。看一下ObservableFilter中的onNext方法。

    public void onNext(T t) {
      if (sourceMode == NONE) {
          boolean b;
          try {
              b = filter.test(t);
          } catch (Throwable e) {
              fail(e);
              return;
          }
          if (b) {
              downstream.onNext(t);
          }
      } else {
          downstream.onNext(null);
      }
    }
    

    boolean b = filter.test(t)是使用combineLatest操作符返回的值,规则是,当需要观察的生命周期事件(即ifecycle.take(1))和在Activity中发射的生命周期事件相等时返回true,否则返回false。

    比如观察的是onStop生命周期,那么观察的事件就是ActivityEvent.STOP,当Activity调用onStop时,那么filter.test(t)返回true。

    downstream.onNext(t)调用ObservableTakeUntil中的onNext方法。

    public void onNext(U t) {
        DisposableHelper.dispose(this);
        otherComplete();
    }
    

    这里基本上就是去清理RxJava的相关数据了。其它情况类似。

  • bindUntilEvent(@NonNull ActivityEvent event)
    指定观察那个生命周期事件。

其它

// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-components-preference:3.1.0'
// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-android-lifecycle:3.1.0'

这两个是在其它情况下事情 ,具体自己看库里面的代码,很少。

相关文章

网友评论

    本文标题:【RxJava】- RxLifecycle解决RxJava内存泄

    本文链接:https://www.haomeiwen.com/subject/rlvwuhtx.html