美文网首页
RxJava 订阅流程源码解析

RxJava 订阅流程源码解析

作者: Wynne丶XXW | 来源:发表于2019-03-03 18:34 被阅读0次

    简单示例

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d("XXW", "subscribe");
                    emitter.onNext(1);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer s) {
                    LogUtil.d("onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    打印结果

    D/XXW: onSubscribe  Thread Name : main
    D/XXW: subscribe
    D/XXW: subscribe Thread Name : RxCachedThreadScheduler-1
    D/XXW: onNext 1
    D/XXW: onNext 2
    D/XXW: onNext 3
    D/XXW: onNext 4
    

    RxJava 之所以在我们Android开发者的圈子 如此火爆 主要是因为它的链式结构,切换线程之简便而闻名. 所以学习它的源码也是很有必要的事情

    订阅流程

    从基本的RxJava使用套路, 我们可以发现 是被观察者订阅观察者, 直白一点就是说, 被观察者有动作之后就会被发送给观察者, 所以我们从代码的角度的话 就先去看看subscribe(observer observer)这个方法

    
    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
                 //抽象方法 进行订阅的关键方法
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    
    //Observable类 的抽象方法
    protected abstract void subscribeActual(Observer<? super T> observer)
    
    1. 可以看出subscribe方法 是一个被观察者(Observable) 的方法, 而且 这个方法只做了一件事 , 订阅观察者(observer) . 但是这个方法又是一个抽象方法, 从面向对象的角度出发, 那么必然是子类实现这个方法进行处理了. 所以我们先要找到这个子类. 所以回到RxJava基本代码来找这个子类
    //通过构建者模式,返回一个Observable对象
     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d("XXW", "subscribe");
                    Log.d("XXW", "subscribe Thread Name : " + Thread.currentThread().getName());
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onNext(4);
                    emitter.onComplete();
                }
            })
    

    通常,我们会在create方法, 传入一个ObservableOnSubscribe 接口的参数. 因为ObservableOnSubscirbe是一个接口, 所以要看create方法

    //create方法会返回Observable类
     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
    //进行
     public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    create方法 直接返回RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)),所以我接着看这个方法内部如何实现,onAssembly 方法里 对onObservableAssembly 进行了判空,如果不为空 则返回apply(f,source), 但是我们查看源码, 发现这个对象默认是空对象, 所以一般就会直接返回我们的ObservableCreate 对象, 我们再看看ObservableCreate类的实现

    
    public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            1.包装Observer类,进CreateEmitter类
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            2.调用observer的onSubscribe方法
            observer.onSubscribe(parent);
    
            try {
            3.调用subscribe方法
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    这个时候我们发现Observable是Observable的子类, 我们之前查看subscribe的源码发现, 它会调用子类的subscribeActual方法. 我们来看subscribeActual做了如上3点, 所以之前在示例中,我们先打印的会是OnSubscibe方法里的Log, 因为之前从create方法传进来的就是我们ObservableOnSubscribe接口, 所以在这里他会调用自己的回调方法subscibe. 我们在外面回调方法中就会通过CreateEmitter这个包装类,进行onNext,onComplete操作. 所以我们这时候要先看看CreateEmitter类.

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            //传进Observer对象
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
    
            //ObservableOnSubscibe 回调方法中调用的oNext
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            //ObservableOnSubscibe 回调方法中调用的oNext
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            @Override
            public void setDisposable(Disposable d) {
                DisposableHelper.set(this, d);
            }
    
            @Override
            public void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public ObservableEmitter<T> serialize() {
                return new SerializedEmitter<T>(this);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
        }
    
    

    这时候我们发现, 平常我们调用的onNext等方法 其实都是走的这个包装类CreateEmitter类, 我们发现这个类实现了Disposable的接口并继承了AtomicReference类. 我们平时如果要切断 rxjava的消息, 一般都是调用Disposeable的dispose方法. 所以我们先看看dispose方法在CreateEmitter的实现,

    DisposableHelper.dispose(this);
    
     public static boolean dispose(AtomicReference<Disposable> field) {
            //1.当前Disposeable
            Disposable current = field.get();
            //2. 断开的Disposeable
            Disposable d = DISPOSED;
            //如果当前Disposeable不是断开
            if (current != d) {
            //将当前的disposeable 设置为断开
                current = field.getAndSet(d);
                if (current != d) {
                    if (current != null) {
                        current.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
    
     public static boolean isDisposed(Disposable d) {
            return d == DISPOSED;
        }
    

    当我们调用dispose方法后, 会判断当前是否断开订阅, 如果订阅了, 如果订阅了则会调用断开连接, 其中的原子性设置 我没有太明白, 以后弄明白再补充, , isDispose方法就很明白了, 来判断我们是否已经断开. 所以当我们上游调用了onNext方法

     public void onNext(T t) {
                 //先判空
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                //消息没有切断才会调用onbserver的onNext方法把消息发送到下游
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    关于onError()和 onComplete() 为何只会执行一个, 源码如下

     @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    

    通过源码 我们可以发现这两个方法 调用方式都是一样的, onError只是多了一个异常的判断.
    因为之前我们分析dispose方法后 我们可以知道, 调用过这个dispose方法后, 会切断消息, 所以!isDisposed方法肯定就会返回false了, 所以这就只有一个方法能执行的原因.

    总结:

    通过上面一系列分析,我们就能大概明白了 RxJava的订阅流程了, 会发现大神写的代码 封装性,扩展性都特别好, 而且设计模式也用了很多, (构建者, 装饰模式) 这些在我们平时写代码的时候也可以使用来提升我们技术!

    相关文章

      网友评论

          本文标题:RxJava 订阅流程源码解析

          本文链接:https://www.haomeiwen.com/subject/ochtuqtx.html