美文网首页
RxJava2 源码分析(一) : subscribe()

RxJava2 源码分析(一) : subscribe()

作者: ShawZ | 来源:发表于2019-05-27 14:37 被阅读0次

    前言

    subscribe()是将被观察者(Observable)和观察者(Observer)连接起来的桥梁
    

    作为开篇我们首先解决三个问题:

    1. 被观察者如何发送数据
    2. 观察者如何接收数据

    一、最简单的subscribe()调用

    先从简单的开始,本篇不涉及线程切换

      Observable
                    .create(new ObservableOnSubscribe<String>() {    
                        @Override    
                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("Hello world!");        
                            emitter.onComplete();   
                            emitter.onError(new Throwable("error"));
                            emitter.onNext("next");
                        }
                    })
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "--onSubscribe");
                        }
    
                        @Override
                        public void onNext(String str) {
                            Log.d(TAG, "--onNext: str = " + str);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "--onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "--onComplete:");
                        }
                    });
    //发射数字1,日志打印
    onSubscribe
    onNext: str = Hello world!
    onComplete
    
    大家都知道Java的代码是一行一行执行的,所以我们首先看just操作符做了哪些事情
    
    Observable.create(ObservableOnSubscribe<T> source)
    
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判断传入的参数是否为null,为null则抛出NullPointerException异常
        ObjectHelper.requireNonNull(source, "The source is null");
        //onAssembly是RxJavaPlugins的钩子函数,会出现很多次和类似的方法,主要是给开发者用于扩展的方法,有必要说一下
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(item));
    }
    
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //onObservableAssembly是RxJavaPlugins的静态全局对象,默认为null,需求外部手动设置
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        //假如我们在外部设置onObservableAssembly,在这里就启了一个过滤转化的功能,具体看setOnObservableAssembly()
        if (f != null) {
            return apply(f, source);
        }
        //所以在没有手动设置的前提下,这个钩子函数,传进来什么就返回什么
        return source;
    }
    
    public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        //onObservableAssembly其实就是Function接口的实例,将我们传入的Observable对象,进行转化过滤等操作,方便我们进行扩展
        RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    }
    
    //具体使用
    RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
        @Override
        public Observable apply(Observable observable) throws Exception {
            //对全局使用的ObservableJust对象,转化成Observable.empty()返回;
            if (observable instanceof ObservableJust){
                return Observable.empty();
            }
            //否则直接返回
            return observable;
        }
    });
    
    create操作符其实就是返回了一个ObservableCreate对象,继承至Observable类
    
    public final class ObservableCreate<T> extends Observable<T>{
    
       final ObservableOnSubscribe<T> source;
       public ObservableCreate(ObservableOnSubscribe<T> source) {
            //这是我们创建的匿名内部类
            this.source = source;
       }
    }
    
    接着看本文的猪脚:subscribe(Observer<? super T> observer)
    
    //我们在外部传入了observer实例
    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                //onSubscribe也是钩子函数,与onAssembly类似,这里返回的就是observer本身
                observer = RxJavaPlugins.onSubscribe(this, observer);
                ObjectHelper.requireNonNull(observer, "...");
                //这是真正的抽象订阅方法,继承Observable的子类必须覆写此方法,所以这里调用的是ObservableCreate类里的subscribeActual
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
               ...
            }
        }
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //CreateEmitter是ObservableCreate的静态内部类,
        //继承AtomicReference<Disposable>类,泛型Disposable,确保数据的原子操作,后面的篇幅单独拎出来说
        //实现ObservableEmitter<T>, Disposable接口
       CreateEmitter<T> parent = new CreateEmitter<T>(observer);
       //这里打印我们的第一行日志     ----onSubscribe
       observer.onSubscribe(parent);
        try {
            //重点看这里
            source.subscribe(parent);
        } catch (Throwable ex) {    
            Exceptions.throwIfFatal(ex);    
            parent.onError(ex);
        }
    }
    
    //Emitter是发射器的意思,看到这三个接口,是不是很激动,对应了Observer观察者接口的三个方法
    //这就很容易联系到我们的问题上来,发射和接收
    public interface ObservableEmitter<T> extends Emitter<T> {...}
    public interface Emitter<T> {    
        void onNext(@NonNull T value);    
        void onError(@NonNull Throwable error);   
        void onComplete();
    }
    
    //还记得我们创建的ObservableOnSubscribe匿名内部类吗?
    new ObservableOnSubscribe<String>() {    
        @Override    
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            //发送数据
            emitter.onNext("Hello world!");     
            //发射完毕
            emitter.onComplete();    
        }
    }
    
    //ObservableEmitter类的onNext()
    //这里发射数据
    @Override
    public void onNext(T t) {    
        if (t == null) {        
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));        
            return;   
        }   
        if (!isDisposed()) {
            //observer是在构造函数中传入的,也就是我们创建的Observer匿名内部类
            //这里调用观察者对象接收数据
            //打印第二行日志   -------onNext: str = Hello world!
            observer.onNext(t);   
        }
    }
    
    public void onComplete() {
        //判断是否解除订阅 true 代表 已经解除订阅
        if (!isDisposed()) {        
            try {
                //通知观察者对象,数据发送完毕
                //打印第三行日志    ---onComplete
                observer.onComplete();        
            } finally {
                //主动解除订阅,后续发射的数据,观察者都不会接收
                //这也解释了我们的日志只打印了三行
                dispose();        
            }    
        }
    }
    
    
    

    二、总结

    1. 通过just操作符创建ObservableCreate被观察者
    2. 创建观察者Observer实例
    3. ObservableCreate通过subscribe订阅Observer观察者
    4. ObservableCreate执行subscribeActual抽象方法
    5. CreateEmitter包装Observer观察者和需要发送的值
    6. ObservableEmitter调用onNext(),onComplete()完成数据发送
        通过以上源码分析,我们很清楚的知道,数据源的发送,以及观察者接收数据的逻辑,但是单纯的只分析subscribe()方法,是很好理解,当分析一长串链式调用的时候,很容易看着看着就摸不到驴屁股了,我觉得最好的理解方法是先熟悉单个操作符的作用,了解执行内容,最后再将知识点串起来,会轻松一点,消化消化继续第二篇~

    相关文章

      网友评论

          本文标题:RxJava2 源码分析(一) : subscribe()

          本文链接:https://www.haomeiwen.com/subject/iurhtctx.html