美文网首页
AutoDispose 源码解析

AutoDispose 源码解析

作者: leilifengxingmw | 来源:发表于2020-03-23 10:20 被阅读0次

    AutoDispose是RxJava2+的一个工具,通过disposal或cancellation将RxJava流的执行自动绑定到提供的作用域。

    说人话就是:

    AutoDispose是配合RxJava2使用的一个工具,我们提供一个作用域(在Android中,Activity的生命周期就是可以看作一个作用域)。AutoDispose会自动将RxJava的数据流和我们提供的作用域进行绑定,当作用域到达结束状态的时候(比如onDestroy),AutoDispose会自动取消RxJava数据流的执行,在Android中可以用来避免RxJava造成的内存泄漏。

    举个例子,我们间隔一秒输出一个整数,可以发现当我们点击返回键的时候,数字还是会一直打印,造成内存泄漏。

    代码使用Kotlin演示

    Observable.interval(1, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(object : Observer<Long> {
                        override fun onComplete() {
                            Log.d(TAG, "onComplete: ")
                        }
                        override fun onSubscribe(d: Disposable) {
                            Log.d(TAG, "onSubscribe: ")
                        }
    
                        override fun onNext(t: Long) {
                            Log.d(TAG, "onNext: $t")
                        }
    
                        override fun onError(e: Throwable) {
                            Log.d(TAG, "onError: ${e.message}")
                        }
                    })
    

    一直输出

    D/AutoDisposeActivity: onSubscribe: 
    D/AutoDisposeActivity: onNext: 0
    D/AutoDisposeActivity: onNext: 1
    D/AutoDisposeActivity: onNext: 2
    D/AutoDisposeActivity: onNext: 3
    D/AutoDisposeActivity: onNext: 4
    D/AutoDisposeActivity: onNext: 5
    D/AutoDisposeActivity: onNext: 6
    ...
    

    如果解决这种内存泄漏问题呢,我们可以保存Disposable的引用,然后在onDestroy的时候手动调用Disposable的dispose方法,如下所示

        var disposable: Disposable? = null
    
        fun onClick(view: View) {
            Observable.interval(1, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(object : Observer<Long> {
                        override fun onComplete() {
                            Log.d(TAG, "onComplete: ")
                        }
    
                        override fun onSubscribe(d: Disposable) {
                            Log.d(TAG, "onSubscribe: ")
                            disposable = d
                        }
    
                        override fun onNext(t: Long) {
                            Log.d(TAG, "onNext: $t")
                        }
    
                        override fun onError(e: Throwable) {
                            Log.d(TAG, "onError: ${e.message}")
                        }
    
                    })
    
        }
    
        override fun onDestroy() {
            super.onDestroy()
            disposable?.dispose()
        }
    

    通过这种方式虽然能解决问题,但是每次都手写,也是太枯燥了吧。所以AutoDispose便应运而生了。使用方式如下所示:

    Observable.interval(1, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            //注释1处
            .`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
            .subscribe(object : Observer<Long> {
                    override fun onComplete() {
                        Log.d(TAG, "onComplete: ")
                    }
    
                    override fun onSubscribe(d: Disposable) {
                        Log.d(TAG, "onSubscribe: ")
                    }
    
                    override fun onNext(t: Long) {
                        Log.d(TAG, "onNext: $t")
                    }
    
                    override fun onError(e: Throwable) {
                        Log.d(TAG, "onError: ${e.message}")
                    }
        })
    

    注释1处,一行代码,就解决,也是美滋滋。

    //传入的this对象是一个Activity对象
    .`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
    

    下面就探索一下其中实现的原理:

    先来一张图


    AutoDispose.jpg

    AndroidLifecycleScopeProvider的from方法

    public static AndroidLifecycleScopeProvider from(LifecycleOwner owner) {
        return from(owner.getLifecycle());
    }
    

    传入的LifecycleOwner对象在这个例子中是一个Activity,owner.getLifecycle()就是返回和Activity关联的Lifecycle对象,然后内部调用重载方法

    public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle) {
        return from(lifecycle, DEFAULT_CORRESPONDING_EVENTS);
    }
    

    传入的DEFAULT_CORRESPONDING_EVENTS对象

    private static final CorrespondingEventsFunction<Lifecycle.Event> DEFAULT_CORRESPONDING_EVENTS =
          lastEvent -> {
            switch (lastEvent) {
              case ON_CREATE:
                return Lifecycle.Event.ON_DESTROY;
              case ON_START:
                return Lifecycle.Event.ON_STOP;
              case ON_RESUME:
                return Lifecycle.Event.ON_PAUSE;
              case ON_PAUSE:
                return Lifecycle.Event.ON_STOP;
              case ON_STOP:
              case ON_DESTROY:
              default:
                throw new LifecycleEndedException("Lifecycle has ended! Last event was " + lastEvent);
            }
        };
    

    CorrespondingEventsFunction是一个接口,上面是lambda表达式写法

    public interface CorrespondingEventsFunction<E> extends Function<E, E> {
    
      /**
       * 指定一个event,返回对应的event,收到对应的event的时候,lifecycle应该dispose
       *
       * @param event the source or start event.
       * @return the target event that should signal disposal.
       * @throws OutsideScopeException if the lifecycle exceeds its scope boundaries.
       */
      @Override
      E apply(E event) throws OutsideScopeException;
    }
    

    从DEFAULT_CORRESPONDING_EVENTS可以看出来:

    1. 在ON_CREATE中订阅会在ON_DESTROY中取消订阅
    2. 在ON_START中订阅会在ON_STOP中取消订阅
    3. 在ON_RESUME中订阅会在ON_PAUSE中取消订阅
    4. 在ON_PAUSE中订阅会在ON_STOP中取消订阅
    5. 如果在ON_STOP或者ON_DESTROY中订阅,直接抛出异常。
    public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle,
                 CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
        return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
    }
    
    private AndroidLifecycleScopeProvider(
        Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
        //注释1处
        this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
        this.boundaryResolver = boundaryResolver;
    }
    

    注释1处,创建了一个LifecycleEventsObservable对象,从名称就可以看出来,这个Observable是用来观察生命周期事件的。

    最后就是返回了一个AndroidLifecycleScopeProvider对象

    AutoDispose的autoDisposable(final ScopeProvider provider)方法

    public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
        //调用重载函数
        return autoDisposable(completableOf(provider));
    }
    

    先是调用ScopescompletableOf(ScopeProvider scopeProvider)方法,返回了一个Completable对象,然后调用AutoDispose的重载函数

    public static Completable completableOf(ScopeProvider scopeProvider) {
        return Completable.defer(
            () -> {
              try {
                //注释1处
                return scopeProvider.requestScope();
              } catch (OutsideScopeException e) {
                Consumer<? super OutsideScopeException> handler =
                    AutoDisposePlugins.getOutsideScopeHandler();
                if (handler != null) {
                  handler.accept(e);
                  return Completable.complete();
                } else {
                  return Completable.error(e);
                }
              }
            });
      }
    

    注释注释1处,调用scopeProvider的requestScope方法获取一个作用域。在这个例子中,scopeProvider是一个AndroidLifecycleScopeProvider对象。

    AutoDispose的autoDisposable(final CompletableSource scope)方法简化版

    public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
        //注释1处,返回了一个AutoDisposeConverter对象
        return new AutoDisposeConverter<T>() {
                
            //...
    
            //注释2处
            @Override
            public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
                //...
                //注释3处
                return new ObservableSubscribeProxy<T>() {
                        
                    //...
                    @Override
                    public void subscribe(Observer<? super T> observer) {
                        //注释4处
                        new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
                    }
                };
            }
        };
    }
    

    注释1处,返回了一个AutoDisposeConverter对象

    Observable的as方法简化版

    public final <R> R as(ObservableConverter<T, ? extends R> converter) {
          return converter.apply(this);
    }
    

    内部就是调用传入的converter的apply方法,将上游的Observable转化成另外一个值。在这个例子中,就是将上游的Observable转化成ObservableSubscribeProxy对象,如注释2处所示。

    ObservableConverter的apply方法

    public interface ObservableConverter<T, R> {
        R apply(Observable<T> upstream);
    }
    

    AutoDisposeConverter接口

    public interface AutoDisposeConverter<T>
        extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
            ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
            ObservableConverter<T, ObservableSubscribeProxy<T>>,
            MaybeConverter<T, MaybeSubscribeProxy<T>>,
            SingleConverter<T, SingleSubscribeProxy<T>>,
            CompletableConverter<CompletableSubscribeProxy> {
    
    }
    

    从AutoDisposeConverter接口的继承结构可以看出来,AutoDisposeConverter可以转换的上游的数据类型有6种。

    • Flowable
    • ParallelFlowable
    • Observable
    • Maybe
    • Single
    • Completable

    我们省略了大部分代码,只关注上游数据类型是Observable的情况。

    注释2处,当上游数据类型是Observable的时候,AutoDisposeConverter的apply方法返回了一个ObservableSubscribeProxy对象。

    然后在注释4处,ObservableSubscribeProxy的subscribe(Observer<? super T> observer)方法

    @Override
    public void subscribe(Observer<? super T> observer) {
        new AutoDisposeObservable<>(upstream, scope).subscribe(observer);
    }
    

    注意了:这里使用上游的Observable和传入的作用域scope构建了一个AutoDisposeObservable对象,然后调用subscribe方法订阅下游的观察者observer。

    然后我们查看一下AutoDisposeObservable类

    final class AutoDisposeObservable<T> extends Observable<T> implements 
       ObservableSubscribeProxy<T> {
            private final ObservableSource<T> source;
            private final CompletableSource scope;
    
      AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) { 
        //上游数据
        this.source = source;
        //作用域对象
        this.scope = scope;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> observer) {
        //注释1处
        source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
      }
    }
    

    注释1处,使用传入的作用域scope和下游的观察者observer包装成一个AutoDisposingObserverImpl对象,然后上游观察者source订阅AutoDisposingObserverImpl对象。

    AutoDisposingObserverImpl部分代码

    final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {
    
      final AtomicReference<Disposable> mainDisposable = new AtomicReference<>();
    
      final AtomicReference<Disposable> scopeDisposable = new AtomicReference<>();
    
      private final AtomicThrowable error = new AtomicThrowable();
      // 传入的作用域对象scope
      private final CompletableSource scope;
      //下游的代理观察者
      private final Observer<? super T> delegate;
    
      AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
        this.scope = scope;
        this.delegate = delegate;
      }
    
      //.... 
    }
    

    首先看一下AutoDisposingObserverImpl的onNext方法

    @Override
    public void onNext(T value) {
        if (!isDisposed()) {
          //注释1处
          if (HalfSerializer.onNext(delegate, value, this, error)) {
            // Terminal event occurred and was forwarded to the delegate, so clean up here
            mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
            AutoDisposableHelper.dispose(scopeDisposable);
          }
        }
    }
    

    注释1处,当没有取消订阅的时候,调用HalfSerializer的onNext方法,向下游观察者发送数据

    public static <T> boolean onNext(Observer<? super T> observer, T value, AtomicInteger wip, AtomicThrowable error) {
        //处理多线程的情况
        if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
          //向下游发射数据
          observer.onNext(value);
          //如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。
          if (wip.decrementAndGet() != 0) {
            Throwable ex = error.terminate();
            if (ex != null) {
              observer.onError(ex);
            } else {
              observer.onComplete();
            }
            return true;
          }
        }
        return false;
    }
    

    正常情况下,向下游发射数据成功并返回fasle。如果处理多线程出了问题,调用下游观察者的onError或onComplete方法,并返回true。如果返回了true,那么AutoDisposingObserverImpl就会将自身状态置为isDisposed,就不会再向下游发射数据了,也不会调用下游观察者的的onError或onComplete方法。

    到这里我们要明白一点:如果AutoDisposingObserverImpl自身状态为isDisposed的时候,就不会调用我们下游观察者的onNext,onError或onComplete方法了,即切断了下游观察者的订阅,这样就避免了内存泄漏。

    AutoDisposingObserverImpl是何时将自身状态置为isDisposed呢?当然是在收到适当的生命周期事件的时候呀,我们继续往下看。

    AutoDisposingObserverImpl的onSubscribe方法,这里才是生命周期作用域发挥作用的方法。

    @Override
    public void onSubscribe(final Disposable d) {
        //注释1处
        DisposableCompletableObserver o =
            new DisposableCompletableObserver() {
                @Override
                public void onError(Throwable e) {
                    scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                    //将自身状态置为isDisposed
                    AutoDisposingObserverImpl.this.onError(e);
                }
    
                @Override
                public void onComplete() {
                    scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
                    //将自身状态置为isDisposed
                    AutoDisposableHelper.dispose(mainDisposable);
                }
            };
        //注释2处
        if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
            //注释3处
            delegate.onSubscribe(this);
            //注释4处
            scope.subscribe(o);
            //注释5处
            AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
        }
    }
    

    注释1处,初始化了一个DisposableCompletableObserver对象o,在该对象的onError方法和onComplete方法中会将AutoDisposingObserverImpl当前状态置为isDisposed。

    注释2处,为scopeDisposable赋值为o
    注释3处,调用下游观察者的onSubscribe方法。
    注释4处,scope订阅DisposableCompletableObserver对象o。在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。
    注释5处,为mainDisposable赋值为d

    注意:注释4处,scope订阅DisposableCompletableObserver对象o,可以猜测,当作用域对象scope处于某个状态的时候,会调用对象o的onError方法或onComplete方法,让AutoDisposingObserverImpl将自身状态置为isDisposed,从而切断上游观察者和下游观察者的联系,也就是取消订阅。

    在这个例子中,scope就是AndroidLifecycleScopeProvider的requestScope方法返回的对象。

    AndroidLifecycleScopeProvider的requestScope方法

    @Override
    public CompletableSource requestScope() {
        return LifecycleScopes.resolveScopeFromLifecycle(this);
    }
    

    LifecycleScopes的resolveScopeFromLifecycle方法

    public static <E> CompletableSource resolveScopeFromLifecycle(
          final LifecycleScopeProvider<E> provider) throws OutsideScopeException {
        return resolveScopeFromLifecycle(provider, true);
    }
    
    public static <E> CompletableSource resolveScopeFromLifecycle(
          final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
          throws OutsideScopeException {
        //注释1处
        E lastEvent = provider.peekLifecycle();
        //注释2处
        CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents();
        if (lastEvent == null) {
          throw new LifecycleNotStartedException();
        }
        E endEvent;
        try {
          //注释3处
          endEvent = eventsFunction.apply(lastEvent);
        } catch (Exception e) {
          if (checkEndBoundary && e instanceof LifecycleEndedException) {
            Consumer<? super OutsideScopeException> handler =
                AutoDisposePlugins.getOutsideScopeHandler();
            if (handler != null) {
              try {
                handler.accept((LifecycleEndedException) e);
    
                // Swallowed the end exception, just silently dispose immediately.
                //返回数据类型Completable.complete()
                return Completable.complete();
              } catch (Exception e1) {
                //返回数据类型Completable.error()
                return Completable.error(e1);
              }
            }
            throw e;
          }
          //注释4处
          return Completable.error(e);
        }
        //注释5处,调用重载方法
        return resolveScopeFromLifecycle(provider.lifecycle(), endEvent);
    }
    

    注释1处,AndroidLifecycleScopeProvider的peekLifecycle方法

    @Override
    public Lifecycle.Event peekLifecycle() {
        lifecycleObservable.backfillEvents();
        return lifecycleObservable.getValue();
    }
    

    我们不去仔细分析,这个方法就是获取当前的Lifecycle.Event。

    注释2处,eventsFunction就是AndroidLifecycleScopeProvider类中的DEFAULT_CORRESPONDING_EVENTS。

    注释3处,获取对应的取消订阅的Lifecycle.Event。

    然后我们注意到,返回的数据有三种类型

    1. Completable.complete()
    2. Completable.error()
    3. 注释5处返回的数据类型

    前两种类型,会导致DisposableCompletableObserver对象立即complete或者error,从而让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。我们再看看注释5处

    注释5处,调用重载方法

    public static <E> CompletableSource resolveScopeFromLifecycle(
          Observable<E> lifecycle, final E endEvent) {
        @Nullable Comparator<E> comparator = null;
        if (endEvent instanceof Comparable) {
          //noinspection unchecked
          comparator = (Comparator<E>) COMPARABLE_COMPARATOR;
        }
        //注释1处,调用重载方法
        return resolveScopeFromLifecycle(lifecycle, endEvent, comparator);
      }
    

    注释1处,调用重载方法,传入的comparator是null

    public static <E> CompletableSource resolveScopeFromLifecycle(
          Observable<E> lifecycle, final E endEvent, final Comparator<E> comparator) {
        Predicate<E> equalityPredicate;
        if (comparator != null) {
          equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
        } else {
          //注释1处
          equalityPredicate = e -> e.equals(endEvent);
        }
        //注释2处,如果条件满足,会调用下游观察者的onComplete方法
        return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
      }
    

    注释1处,equalityPredicate是lambda表达式写法

    public interface Predicate<T> {
        /**
         * 测试指定的输入值,返回一个boolean值。
         */
        boolean test(@NonNull T t) throws Exception;
    }
    

    注释2处,传入的lifecycle是是AndroidLifecycleScopeProvider类中的lifecycleObservable对象

    private AndroidLifecycleScopeProvider(Lifecycle lifecycle, 
                CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
        this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
        this.boundaryResolver = boundaryResolver;
    }
    
    return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
    

    注释2处的这行代码含义就是:

    1. 先忽略一个当前的生命周期事件,为什么要忽略一个呢?暂时还没明白。
    2. takeUntil:返回一个ObservableTakeUntilPredicate,每发射一个元素以后,都会判断equalityPredicate条件是否满足,如果满足就发射一个complete事件。
    3. ignoreElements:返回了一个ObservableIgnoreElementsCompletable对象,忽略takeUntil发射的正常元素,只接收complete事件,或者error事件,这种情况下,会导致DisposableCompletableObserver对象立即complete或者error,让AutoDisposingObserverImpl将状态置为isDisposed,从而切断上下游的联系。

    我们看看LifecycleEventsObservable类,我们知道当Observable调用subscribe方法订阅观察者的时候,内部会调用subscribeActual方法。

    class LifecycleEventsObservable extends Observable<Event> {
    
      private final Lifecycle lifecycle;
      private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create();
    
      LifecycleEventsObservable(Lifecycle lifecycle) {
        //传入的是和Activity关联的生命周期
        this.lifecycle = lifecycle;
      }
    
     //...
    
      @Override
      protected void subscribeActual(Observer<? super Event> observer) {
        ArchLifecycleObserver archObserver =
            new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
        observer.onSubscribe(archObserver);
        if (!isMainThread()) {
          observer.onError(
              new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
          return;
        }
       //注释1处
        lifecycle.addObserver(archObserver);
        if (archObserver.isDisposed()) {
          lifecycle.removeObserver(archObserver);
        }
      }
      //ArchLifecycleObserver实现了LifecycleObserver接口
      static final class ArchLifecycleObserver extends MainThreadDisposable
          implements LifecycleObserver {
        private final Lifecycle lifecycle;
        private final Observer<? super Event> observer;
        private final BehaviorSubject<Event> eventsObservable;
    
        ArchLifecycleObserver(
            Lifecycle lifecycle,
            Observer<? super Event> observer,
            BehaviorSubject<Event> eventsObservable) {
          this.lifecycle = lifecycle;
          this.observer = observer;
          this.eventsObservable = eventsObservable;
        }
    
        @Override
        protected void onDispose() {
          lifecycle.removeObserver(this);
        }
    
        @OnLifecycleEvent(Event.ON_ANY)
        void onStateChange(LifecycleOwner owner, Event event) {
          if (!isDisposed()) {
            if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
              // Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
              // we fire this conditionally to avoid duplicate CREATE events.
              eventsObservable.onNext(event);
            }
            //注释2处
            observer.onNext(event);
          }
        }
      }
    }
    

    首先ArchLifecycleObserver实现了LifecycleObserver接口,然后在注释2处使用OnLifecycleEvent注解了onStateChange方法,所以当Android生命周期发生改变的时候,都会回调这个方法,当发射了相应的生命周期事件以后,比如说onDestroy的时候
    lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();条件满足,takeUntil会发射一个complete事件,从而导致DisposableCompletableObserver回调onComplete方法,从而导致AutoDisposingObserverImpl状态变为isDisposed,从而切断上下游的联系,避免内存泄漏。

    疑问:在LifecycleScopes类的resolveScopeFromLifecycle方法中

    lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements()
    

    为什么这里要skip一个事件呢?,目前还没想明白。

    参考链接:

    相关文章

      网友评论

          本文标题:AutoDispose 源码解析

          本文链接:https://www.haomeiwen.com/subject/uxekyhtx.html