美文网首页Android
RxJava->doOnNext()

RxJava->doOnNext()

作者: 冉桓彬 | 来源:发表于2017-09-13 19:39 被阅读5773次

    example:

    Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Test01.class, "onNext()->1");
                emitter.onNext(1);
                LogUtils.log(Test01.class, "subscribe()->2");
                emitter.onNext(2);
                LogUtils.log(Test01.class, "subscribe()->3");
                emitter.onNext(3);
                LogUtils.log(Test01.class, "subscribe()->onComplete()");
                     emitter.onComplete();
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtils.log(Test01.class, "accept()->integer:" + integer);
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable disposable) {
                    sDisposable = disposable;
                    LogUtils.log(Test01.class, "onSubscribe()");
                }
    
                @Override
                public void onNext(Integer value) {
                    LogUtils.log(Test01.class, "onNext()->value:" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    LogUtils.log(Test01.class, "onError()");
                }
    
               @Override
               public void onComplete() {
                   LogUtils.log(Test01.class, "onComplete()");
               }
          });
    

    doOnNext():

    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            LogUtils.log(Test01.class, "accept()->integer:" + integer);
        }
    })
    
    public interface Consumer<T> {
        void accept(T t) throws Exception;
    }
    
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        public final Observable<T> doOnNext(Consumer<? super T> onNext) {
            return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
        }
    
        @SchedulerSupport(SchedulerSupport.NONE)
        private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
            return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
        }
    }
    
    public final class RxJavaPlugins {
        return source;
    }
    
    class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T>;
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>;
    
    • 1、doOnNext()返回了ObservableDoOnNext对象, 后边subcribe应当切换到ObservableDoOnNext中去.
    new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate)
    
    public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    
        public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                                  Consumer<? super Throwable> onError,
                                  Action onComplete,
                                  Action onAfterTerminate) {
            super(source);
        }
    }
    
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        protected final ObservableSource<T> source;
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            this.source = source;
        }
    }
    

    什么时候能模仿着写出这种结构的代码, 什么时候就牛逼了

    • 1、subscribe被ObservableDoOnEach调用, 但是ObservableDoOnEach内部又持有ObserverCreater的引用;

    subscribe():

    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
        }
        protected abstract void subscribeActual(Observer<? super T> observer);
    }
    
    • subscribeActual实际被子类ObservableDoOnEach调用;
    public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(Observer<? super T> t) {
            source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
        }
    }
    
    • 最终还是走到了ObservableCreater里面的subscribeActual(), 而Observer是被DoOnEachObserver所实现;
    static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
        Disposable s;
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                actual.onSubscribe(this);
            }
        }
    }
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            source.subscribe(parent);
        }
    }
    

    这段代码应该用到的是适配器模式

    • 1、ObservableCreate持有的Observer实际为DoOnEachObserver引用;
    • 2、又通过observer.onSubscribe()将CreateEmitter传给了DoOnEachObserver中的Disposable s, 即s实际上指向的是CreateEmitter;
    • 3、source指向的是ObservableCreate, 所以source.subscribe()将内部Observer指向了DoOnEachObserver;
    • 4、actual.onSubscribe(this)将Disposable指向了DoOnEachObserver;

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
            CreateEmitter(Observer<? super T> observer) {...}
    
            @Override
            public void onNext(T t) {...}
    
            @Override
            public void onError(Throwable t) {...}
    
            @Override
            public void onComplete() {...}
    
            @Override
            public void setDisposable(Disposable d) {...}
    
            @Override
            public void setCancellable(Cancellable c) {...}
    
            @Override
            public ObservableEmitter<T> serialize() {...}
    
            @Override
            public void dispose() {...}
    
            @Override
            public boolean isDisposed() {...}
    }
    
    • 1、所以每次被观察者通过发射器emitter调用onError(), onNext(), onComplete()实际上最终都会先调用CreateEmitter的对应的方法, 然后再去调用DoOnEachObserver对应的方法;
    • 2、Disposable实际上指向的是DoOnEachObserver, 所以调用dispose(), isDisposed()时实际走的是DoOnEachObserver内部的方法, 而DoOnEachObserver内部的Disposable s又指向了CreateEmitter, 所以最终决定观察者能否收到消息的决定权还是在CreateEmitter手中;

    总结:

    • 1、通过源码可以看到, onOnNext()的accept()方法仅仅只是在Observer的onXXX()方法被调用之前调用, 方且没有与Observer的调用之间没有任何关系;
    • 2、所以doOnNext()这个方法可以用来在观察者Observer:onXXX()方法被调用之前进行一些初始化操作;

    试试连续调用多个doOnNext()方法:

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    LogUtils.log(Test01.class, "subscribe->onNext()->1");
                    emitter.onNext(1);
                    LogUtils.log(Test01.class, "subscribe()->onNext()->2");
                    emitter.onNext(2);
                    LogUtils.log(Test01.class, "subscribe()->onNext()->3");
                    emitter.onNext(3);
                    LogUtils.log(Test01.class, "subscribe()->onComplete()");
                    emitter.onComplete();
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtils.log(Test01.class, "accept()_1->integer:" + integer);
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtils.log(Test01.class, "accept()_2->integer:" + integer);
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable disposable) {
                    sDisposable = disposable;
                    LogUtils.log(Test01.class, "onSubscribe()");
                }
    
                @Override
                public void onNext(Integer value) {
                    LogUtils.log(Test01.class, "onNext()->value:" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    LogUtils.log(Test01.class, "onError()");
                }
    
                @Override
                public void onComplete() {
                    LogUtils.log(Test01.class, "onComplete()");
                }
            });
    

    打印结果如下所示:

    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onSubscribe()
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe->onNext()->1
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:1
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:1
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:1
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->2
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:2
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:2
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:2
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->3
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:3
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:3
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:3
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onComplete()
    09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onComplete()
    

    结合源码看看为何打印会是这种打印结果;
    下面的分析可能会很绕, 也可能会让人感觉废话连篇; 这也体现了RxJava架构的复杂性;

    public abstract class Observable<T> implements ObservableSource<T> {
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    }
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    }
    
    • 1、Observable指向ObservableCreate, ObservableCreate内部持有ObservableOnSubscribe的引用;
    public abstract class Observable<T> implements ObservableSource<T> {
        public final Observable<T> doOnNext(Consumer<? super T> onNext) {
            return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
        }
        private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
            return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
        }
    }
    public final class RxJavaPlugins {
        public static <T> Observable<T> onAssembly(Observable<T> source) {
            source;
        }
    }
    public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
        final Consumer<? super T> onNext;
        public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                                  Consumer<? super Throwable> onError,
                                  Action onComplete,
                                  Action onAfterTerminate) {
            super(source);
            this.onNext = onNext;
        }
    }
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        protected final ObservableSource<T> source;
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            this.source = source;
        }
    }
    
    • 1、第一次调用doOnNext()以后, Observable指向了ObservableDoOnEach_1, 并将当前Observable的引用传给ObservableDoOnEach_1, 即ObservableDoOnEach_1持有ObservableCreate的引用, ObservableDoOnEach_1持有Consumer_1;
    • 2、同理, 第二次调用doOnNext()以后, Observable指向了ObservableDoOnEach_2, 并将当前的Observable的引用传给了ObservableDoOnEach_2, 即 ObservableDoOnEach_2持有ObservableDoOnEach_1的引用. ObservableDoOnEach_2持有Consumer_2;

    接下来看subscirbe(...)何如实现doOnNext()的连续调用:

    public abstract class Observable<T> implements ObservableSource<T> {
        public final void subscribe(Observer<? super T> observer) {
            subscribeActual(observer);
        }
        protected abstract void subscribeActual(Observer<? super T> observer);
    }
    
    • 重点就是在subscribeActual()这个方法, 后边单线程操作符的话, 就只看这个方法了;
    public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
        public void subscribeActual(Observer<? super T> t) {
            source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
        }
    }
    
    • 1、subscribeActual实际被ObservableDoOnEach_2调用, 而此时的source为ObservableDoOnEach_1, onNext为Consumer_2;

    • 2、通过subscribe(...)将DoOnEachObserver_02的引用付给ObservableDoOnEach_1, 然后递推, 将DoOnEachObserver_1的引用付给ObservableCreate;

    • 3、DoOnEachObserver_2持有的Observer actual实际为我们外部通过new Observer创建的引用;

    • 4、ObservableDoOnEach_1调用subscribeActual(...)时传的参数Observer实际就是ObservableDoOnEach__2调用subscribeActual(...)时所创建的DoOnEachObserver_2, 所以DoOnEachObserver_1内部Observer actual实际指向的是DoOnEachObserver_2;

    • 然后切到ObservableCreate中去:

    public final class ObservableCreate<T> extends Observable<T> {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            source.subscribe(parent);
        }
    }
    
    • 1、CreateEmitter持有的Observer为DoOnEachObserver_1, DoOnEachObserver_1持有的Disposable为CreateEmitter, ObservableOnSubscribe持有CreateEmitter的引用;
    static final class CreateEmitter<T> {
        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
    }
    static final class DoOnEachObserver<T> {
        @Override
        public void onNext(T t) {
            onNext.accept(t);
            actual.onNext(t);
        }
    }
    
    • 当CreateEmitter调用一次onNext()时, DoOnEachObserver_1调用了自己的onNext()方法;
    • 而此时onNext.accept()实际为Consumer_1.accept(t);
    • actual.onNext()因为此时的acutal实际持有的是DoOnEachObserver_2的引用, 所以继续调用DoOnEachObserver_2.onNext(), DoOnEachObserver_2中的onNext()实际指向Consumer_2, 而actual实际指向我们通过new Observer创建的Observer对象;

    通过几张图来对文字进行归纳总结:

    相关文章

      网友评论

        本文标题:RxJava->doOnNext()

        本文链接:https://www.haomeiwen.com/subject/grulsxtx.html