美文网首页工作生活
从源码分析RxJava订阅过程

从源码分析RxJava订阅过程

作者: 有没有口罩给我一个 | 来源:发表于2019-06-30 23:38 被阅读0次
    wdroid.jpg

    都知道观察模式吧?

    在开始之前让我们简单了解一下观察模式,就是某对象A的变化引起其他多个对象B变化,但是前提是你需要去订阅我,打个比方:就是我的状态发生了改变,那我怎么通知你呢?所以我需要知道的如何去通知其他对象说我这里已经改变了,你看看那需不需要做出改变。就比如微信的订阅号,如果你不订阅,那该订阅号在发布内容也不会通知,这里的订阅号就是被观察者,而用户就是观察者。那怎么说让这两者关联来呢?前面说的订阅号是要提供一个接口,允许用户去订阅的,所以最后就是被观察者和观察者两个都得提供接口,订阅号提供的接口让用户去订阅类比微信号,当订阅号发布内容,就通过这个微信号通知观察者,所以订阅就是这两者的关联点。

    开始之前的两个重要的类或接口:ObservableObserver

    • Observable 它实现ObservableSource接口,通俗来讲Observable就是一个被观察者也有人叫可观察的资源,这里就叫被观察者;
    • Observer 观察者;
      涉及的类:


      RxJava2.png

    订阅流程分析

    开始RxJava的订阅流程分析之前,来个简单的栗子,代码如下:

    Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("发射 subscribe");
                emitter.onComplete();
            }
        });//ObservableCreate
    
        Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1
    
        observableSubscribeOn.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                showLog("onSubscribe");
            }
    
            @Override
            public void onNext(String s) {
                showLog("onNext");
            }
    
            @Override
            public void onError(Throwable e) {
                showLog("onError");
            }
    
            @Override
            public void onComplete() {
                showLog("onComplete");
            }
        });
    

    日志结果:

    onSubscribe ,Thread: main
    onNext ,Thread: RxNewThreadScheduler-3
    onComplete ,Thread: RxNewThreadScheduler-3
    

    如上代码,之所以分开来写是为了更清晰的去理解每一步RxJava生成的相关类。

    如果你认真看前面的内容,你一下就明白Observable.subscribe()方法也就是订阅的意思,是 ObservableObserver 的关联点,也就是被观察者和观察者的关联点,所以我们的分析就从Observable.subscribe(Observer observer)方法开始代码如下:

     public final void subscribe(Observer<? super T> observer) {
        try {
    
            // .....此处省略几亿代码....
    
            //此方法在Observable类是中是抽象的,注定是子类实现
            subscribeActual(observer);
    
            // .....此处省略几亿代码....
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
           // .....此处省略几亿代码....
        }
    }
    
    • 上面代码不难理解在subscribe方法中直接就调用了subscribeActual(observer)方法,我可以翻译为 实际订阅;
    • subscribe方法是Observable类的方法,他是抽象类,传入了一个 Observer 对象,开始的时候栗子我们可以知道Observable是通过我们调用Observable.create(ObservableOnSubscribe) 所创建出来的;
    • 那subscribeActual在Observable中是抽象方法,肯定是子类去实现了该方法,从第二点知道子类肯定是在Observable.create(ObservableOnSubscribe)中给new出来的,那么接下我们看看Observable.create(ObservableOnSubscribe)方法的实现;

    // Observable.create(ObservableOnSubscribe)

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        // .....此处省略几亿代码....
    
        // 直接就创建了ObservableCreate,并把source作为参数传进去
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    // onAssembly

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    我们从上面代码我们知道在Observable.create(ObservableOnSubscribe)中直接就创建了ObservableCreate,而ObservableCreate是Observable的子类,并把source作为参数传进去,最后调用RxJavaPlugins.onAssembly方法,我们默认返回ObservableCreate实例,所以Observable.create方法最后返回的是ObservableCreate实例,所以就验证了上面的第三点实际调用的是ObservableCreate.subscribeActual(observer)方法,这是在不考虑其他变换和线程切换的情况,那我们就来看看ObservableCreate.subscribeActual(observer)方法的实现,代码如下:

     @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //事件发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //直接回调Observer的onSubscribe方法,这个方法是和线程切换无关,只在当前的线程中执行
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    代码不多,也很好理解:

    • 首先调用observer.onSubscribe(parent)方法通知Observer已经订阅成功了。
    • 最后调用source.subscribe(parent)方法完成订阅,source又是什么呢?我们知道在ObservableCreate是在Observable.create方法时创建的,并把ObservableOnSubscribe传进来,所以source就是ObservableOnSubscribe,直接回调ObservableOnSubscribe.subscribe方法并把CreateEmitter作为参数传递进去,之后再我们是栗子中通过这个对象调研onNext方法或者onComplete方法发射事件;

    看一下CreateEmitter的实现,代码如下:

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
        //通过构造方法注入 观察者实例
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
    
       // .....此处省略几亿代码....
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    
       // .....此处省略几亿代码....
    }
    

    为了简介清晰我删掉很多无关代码,只保留onNext等这些相关的方法。

    • 其实CreateEmitter是Observable的静态内部类,
    • 在上面我们知道Observable.subscribeActual方法中创建了CreateEmitter实例并将Observer作为参数通过构造方法注入Observer实例,作为CreateEmitter的成员变量;
    • 之后在subscribeActual方法中调用ObservableOnSubscribe.subscribe的方法并把CreateEmitter实例作为方法参数传递进去;
    • 简单来说CreateEmitter的作用就是发射事件,里面分装了Observer实例,发射事件就回调到Obsever中的方法,如onNext等方法;

    有没有发现从一开始我们就仅仅讲了从Obsevable的创建到订阅,这是比较汉理解的,如果我增加一个map或线程切换呢?这里暂时不展开讲线程切换。

    重新把栗子的代码在贴一遍:

    Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("发射 subscribe");
                emitter.onComplete();
            }
        });//ObservableCreate
    
        Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
            @Override
            public Object apply(String s) throws Exception {
                Log.e("tag", "map");
                return "aa";
            }
        })
    
        observableSubscribeOn.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                showLog("onSubscribe");
            }
    
            @Override
            public void onNext(String s) {
                showLog("onNext");
            }
    
            @Override
            public void onError(Throwable e) {
                showLog("onError");
            }
    
            @Override
            public void onComplete() {
                showLog("onComplete");
            }
        });
    

    如上代码,订阅流程会和之前的有什么不一样呢?那么我们看个究竟,就从 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())开始,代码如下:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //这里把上游this传进去也就是source,以便调用上游的subscribe方法
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    

    从上面代码看,我们知道在map方法中创建了ObservableMap并把上游的Observable参进去了,而我们知道从Observable.subscribe方法开始订阅就会调用 subscribeActual(observer)方法,所以在Observable.subscribe之后就会调用ObservableMap的subscribbeActual方法,代码如下:

     public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    

    在ObservableMap的subscribbeActual方法中,直接调用传进来的Observable的subscribe方法又间接调用subscribbeActual方法没所以,订阅的过程实际上是一样的。

    总结

    • Observable是由上游往下游传递的,并且每个操作符都会创建新的Observable对象包裹上游的实例;
    • Observer是由下游往上游传递的,也就是从Observable.subscribe方法开始。

    相关文章

      网友评论

        本文标题:从源码分析RxJava订阅过程

        本文链接:https://www.haomeiwen.com/subject/thiicctx.html