美文网首页
RxJava 内存泄漏

RxJava 内存泄漏

作者: Kip_Salens | 来源:发表于2021-01-16 20:03 被阅读0次

    1. RxJava 内存泄漏原因

    • Disposable 基本原理:

    (1)这里仅看下 Observer 的执行,如在主线程执行 Observer,会走 Observable 的 observeOn 方法,然后会把 Observable 包装成 ObservableObserveOn。
    当被订阅者如 PublishSubject 通过 onNext 发送事件时,会调用 ObservableObserveOn 中的订阅者 ObserveOnObserver 的 onNext 方法。之后会通过 Worker 来执行 ObserveOnObserver(实现了Runnable接口) 的 run() 方法。

    (2)Worker 是每个动作的执行者,通过线程池执行,而 Worker 的创建和管理是通过 Schedulers 来完成的,在 subscribe 订阅时 Schedulers 负责创建该订阅者执行的 Worker。Schedulers 是我们在使用 RxJava 时指定的,如 Schedulers.io(),Schedulers.computation() 或 AndroidSchedulers.mainThread()。
    这里以 Schedulers.computation() 创建的 Worker 为例:

    ObserveOnObserver.oNext() --> EventLoopWorker.schedule() --> PoolWorker.scheduleActual() --> ScheduledExecutorService.submit()

    (3)scheduleActual() 函数中通过线程池执行任务,传入的 Runnable 使用 ScheduledRunnable 包装了一层,ScheduledRunnable 实现了 Callable,通过线程池执行时返回 Future,通过 Future 可以获取任务执行状态以及可以取消任务。
    scheduleActual() 返回结果是 ScheduledRunnable,ScheduledRunnable 同时实现了 Disposable 接口,在 dispose() 方法中通过 Future.cancel() 来取消任务执行。

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        // ScheduledRunnable 实现了 Disposable接口,函数返回 Disposable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 通过线程池执行
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    
    • 上下游的 Disposable 传递
     上游  Single.just(true)
      |        .delay(2500, TimeUnit.MILLISECONDS)
      |        .subscribeOn(Schedulers.io())
      |        .observeOn(AndroidSchedulers.mainThread())
     下游      .subscribe(new Consumer<Boolean>()
    

    订阅时的基本流程:简单来说就是下游调用 subscribe(),向上游调用subscribe(),上游 subscribe() 中创建 Disposable,再往下游调用 Observer 的 onSubscribe(Disposable),下游会对上游传过来的 Disposable 进行包装,所以最终调用 dispose() 方法时,下游的 dispose() 方法中也会调用上游的 dispose() 方法。

    例如:

    (1)LambdaObserver 中 DisposableHelper.setOnce(this, d)

    (2)ObserveOnObserver 中变量 Disposable upstream;

    调用 dispose(),会调用自己的 dispose() 方法和上游的 dispose() 方法

    截屏2021-01-05 下午8.47.35.png

    2. 解决方法

    内存泄漏示例一:

    @Override
    protected void onStart() {
        super.onStart();
        mDisposable = Single.just(true)
            .delay(2500, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                // do something
              }
            }, Functions.emptyConsumer());
    }
    

    由于在 subscribe 方法中创建 Consumer 或者 Observer 时,属于匿名内部类,所以会持有外部类对象,若果外部类是 Activity 或者 Fragment 或者 View,当页面销毁或者 View 销毁时,RxJava 的线程还在执行,就会一直持有 Activity 、Fragment 或 View,导致内存泄漏。对于这种情况的解决,就是取消订阅,以及结束 RxJava 的线程执行,保证 RxJava 中的订阅者能够被回收。常见的处理方式有以下三种:

    • 解决方式一:
      @Override
      protected void onDestroy() {
        super.onDestroy();
        // 手动解除订阅
        if (mDisposable != null && !mDisposable.isDisposed()) {
          mDisposable.dispose();
          mDisposable = null;
        }
      }
    
    • 解决方式二:使用 CompositeDisposable

    CompositeDisposable 是一个 disposable 的容器,可以容纳多个 disposable,添加和去除的复杂度为O(1)。

    // 成员变量
      private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
      protected void onStart() {
        super.onStart();
        mCompositeDisposable.add(dispose1);
        mCompositeDisposable.add(dispose2);
        mCompositeDisposable.add(dispose3);
        ...
     }
      @Override
      protected void onDestroy() {
        super.onDestroy();
        // 手动解除订阅
        if (mCompositeDisposable != null && !mCompositeDisposable.isDisposed()) {
          mCompositeDisposable.dispose();
        }
      }
    
    • 解决方式二:
    @Override
    protected void onStart() {
        super.onStart();
        mDisposable = Single.just(true)
            .delay(2500, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .compose(((BaseActivity)getActivity()).bindUntilEvent(ActivityEvent.DESTROY)) // 根据生命周期自动解除订阅
            .subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                // do something
              }
            }, Functions.emptyConsumer());
    }
    

    相关文章

      网友评论

          本文标题:RxJava 内存泄漏

          本文链接:https://www.haomeiwen.com/subject/bcqoaktx.html