美文网首页Android Developers
Rxjava订阅和取消流程分析

Rxjava订阅和取消流程分析

作者: A_si | 来源:发表于2020-04-07 09:03 被阅读0次

    事件序列

    简单使用:

    Observable.just(1).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Observable:

    Observable<Integer> just = Observable.just(1); 是一个被观察者,看看just做了什么:

      public static <T> Observable<T> just(T item) {
            ObjectHelper.requireNonNull(item, "item is null");
            return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
        }
    
        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    onAssembly参数new了一个ObservableJust,函数体只是创建一个钩子函数。查下ObservableJust

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
            observer.onSubscribe(sd);
            sd.run();
        }
    
        @Override
        public T call() {
            return value;
        }
    }
    

    把发射的数据保存为value,然后重写subscribeActual,记住这个函数,继续查看订阅发生了什么:

    订阅:

      public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    一些判空,observer = RxJavaPlugins.onSubscribe(this, observer);也是创建钩子,然后调用了subscribeActual,这个才是订阅发生的真实事件,返回查看这个方法,一共有三部:

    1. new了ScalarDisposable对象,
    2. 调用observer.onSubscribe(sd)
    3. 调用第一步new出来的对象的run方法,也就是observer.onNext(value);,如果发射完成还会调用observer.onComplete();

    这是一个简单的订阅流程。上游发射数据,订阅的时候,observe调用自己的方法。

    数据转换

    上面流程加了map操作:

            Observable.just(1)
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) throws Exception {
                            return String.valueOf(integer);
                        }
                    })
    

    查看map的实现:

        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

    和just几乎一样,只不过onAssembly的参数是ObservableMapObservableMap和上面的ObservableJust结构一样,只不过这里保存的不再是value,而是一个Function,我们需要关注的也是这个方法:

      @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    

    订阅的时候会调用这个方法,这里new了一个MapObserver,并和上游完成订阅,在这里订阅的时候,依然会调用MapObserver的onNext方法。而onNext方法等于一个代理,调用了下游订阅的Observe,也就是代码里写的那个Observe

     @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                downstream.onNext(v);
            }
    

    整个流程就是,把原来的Observable转化为ObservableMap,然后在订阅的时候,为ObservableMap添加一个订阅者MapObserver,接受上游的数据,在这个订阅者里面转化数据并回调给原来的Observe

    disposed取消订阅

    取消分为以下三种情况:

    1. 无延迟无后续,single.just之类,只发射一个数据;
    2. 有延迟无后续,delay之类,延迟发射一次数据;
    3. 有延迟有后续,interval之类,轮询发射数据;

    无延迟无后续

            Single.just(1)
                    .subscribe(new SingleObserver<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onSuccess(Integer integer) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
                    });
    

    这个订阅事件只发射一次,并且没有延迟,上面我们看过subscribeActual,知道onSubscribe的参数是哪一个,下面查看代码,看看取消做了什么:

        @Override
        protected void subscribeActual(SingleObserver<? super T> observer) {
            observer.onSubscribe(Disposables.disposed());
            observer.onSuccess(value);
        }
    

    点进去查看disposed(),返回的是EmptyDisposable.INSTANCE,查看它的取消:

       @Override
        public void dispose() {
            // no-op
        }
    

    什么都没做。因为整个事件只有一个数据,订阅就发射了,并没有后续事件。所以事件序列已经完成,不需要我们手动取消了。

    有延迟不后续

            Single.just(1)
                    .delay(1,TimeUnit.SECONDS)
                    .subscribe(new SingleObserver<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            
                        }
    
                        @Override
                        public void onSuccess(Integer integer) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
                    });
    

    根据前面map的代码分析,return delay(time, unit, Schedulers.computation(), false);可是看到是自动切了线程,然后我们找到SingleDelay,查看代码:

    
        @Override
        protected void subscribeActual(final SingleObserver<? super T> observer) {
    
            final SequentialDisposable sd = new SequentialDisposable();
            observer.onSubscribe(sd);
            source.subscribe(new Delay(sd, observer));
        }
    

    SingleDelay被下面的内部类Delay订阅,这个SequentialDisposable就是实际的订阅时候的Disposable。它其实是个引用:

    
            @Override
            public void onSubscribe(Disposable d) {
                sd.replace(d);
            }
    
            @Override
            public void onSuccess(final T value) {
                sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
            }
    
            @Override
            public void onError(final Throwable e) {
                sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
            }
    

    在订阅的时候,replace的是上游的Disposable,因为这个方法是上游调用并赋值的,然后,在onSuccessonError的时候,又一次replace,也就是说,在订阅发生,数据还在延迟的时候,是取消的上游,在收到后,是取消的下游。

    有延迟有后续

    Observable.interval(1, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            
                        }
    
                        @Override
                        public void onNext(Long aLong) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    根据前面delay的解析,先进入interval函数查看,这里看到会自动切线程,然后一个新建的ObservableInterval,进入查看:

        @Override
        public void subscribeActual(Observer<? super Long> observer) {
            IntervalObserver is = new IntervalObserver(observer);
            observer.onSubscribe(is);
    
            Scheduler sch = scheduler;
    
            if (sch instanceof TrampolineScheduler) {
                Worker worker = sch.createWorker();
                is.setResource(worker);
                worker.schedulePeriodically(is, initialDelay, period, unit);
            } else {
                Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
                is.setResource(d);
            }
        }
    

    第二行的observer就是下游写的订阅者,onSubscribe传递的参数就是第一行new的IntervalObserver,那么它一定是个Disposable,果不其然:

     static final class IntervalObserver
        extends AtomicReference<Disposable>
        implements Disposable, Runnable {
    

    但是看到这里,又疑惑了,这里类似线程的原子操作:

        @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
    
        public static boolean dispose(AtomicReference<Disposable> field) {
            Disposable current = field.get();
            Disposable d = DISPOSED;
            if (current != d) {
                current = field.getAndSet(d);
                if (current != d) {
                    if (current != null) {
                        current.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
    

    其实IntervalObserver继承了AtomicReference<Disposable>,又实现了Disposable,也就说,它可以用AtomicReference代理别的Disposable,也可以用自己的Disposable。它既可以歌手,也可以是个经纪人。
    回到上面继续看subscribeActual

                Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
                is.setResource(d);
    

    第一行有一个d,然后setResource,这个d就是IntervalObserver实际代理的Disposable。第一行的代码执行了切换线程,和向下游发射数据,所以取消,也就是取消了上游的发射数据,让上游停止发射数据。再看它的另一个方法:

        @Override
            public void run() {
                if (get() != DisposableHelper.DISPOSED) {
                    downstream.onNext(count++);
                }
            }
    

    downstream就是传是入的下游订阅者,在这里判断,如果DISPOSED,就不调用downstream.onNext(count++);,也就是下游接收不到了。

    既取消了上游的发射,也取消了下游的接收。

    相关文章

      网友评论

        本文标题:Rxjava订阅和取消流程分析

        本文链接:https://www.haomeiwen.com/subject/pqslphtx.html