美文网首页Android
RxJava浅析——事件如何从上游传递到下游

RxJava浅析——事件如何从上游传递到下游

作者: 青象 | 来源:发表于2017-11-29 16:41 被阅读11次

    之前学过一阵子RxJava1.x,但没应用到项目中。最近在Android上使用一个Stomp协议的库,这里面用到了RxJava2,所以重新把RxJava给捡起来了。用了一阵子之后觉得光知其然而不知其所以然比较别扭,而且作为开发总是对源码充满着好奇。虽然源码可能晦涩难懂,逻辑千回百转,但如果能从中领悟一点架构设计的精妙之处或许就是值得的。所以就开始了这一段读RxJava2源码之路。

    不在这里立任何的Flag!!!总之算是有一个开始。

    怕事情说不清楚,所以语言会比较啰嗦,请大家见谅。

    本文将会分析最简单最基础的RxJava2的源码——如何将事件从Observable(上游)传递到Observer(下游)。

    先上代码(为了能看得清楚,这里不使用lambda写法),没错,本文就是解释一下这段代码会如何执行。

    Observable
        .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("abc");
                }
            })
         .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                }
                
                @Override
                public void onNext(@NonNull String s) {
                    Log.i(TAG, "onNext " + s);
                }
                
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.e(TAG, "onError", e);
                }
                
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete");
                }
            });
    

    先把几个类罗列一下:

    • Observable:上游。事件的源头。
    • Observer:下游。事件的处理者。
    • ObservableEmitter:事件发送者。
    • Disposable:事件切断者。
    • ObservableOnSubscribe:一个接口。只是为了将ObservableEmitter返回给上游,以便发送事件。

    使用过RxJava2的童鞋应该对ObservableObserver,还有这一串链式操作非常熟悉。

    先看一下create()方法干了什么:

    //代码位置:io.reactivex.Observable.java
    //不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            return new ObservableCreate<T>(source);
    }
    

    create()方法很简单,只给我们返回了一个ObservableCreate对象。然后把我们传入的ObservableOnSubscribe对象(source)传入了ObservableCreate的构造方法。

    ObservableOnSubscribe是一个接口。我们在create()方法的入参时给了一个匿名内部类的实现。也就是说这个ObservableOnSubscribe完全是我们外部给的实现。注意下这个subscribe()方法的入参是ObservableEmitter,这个是事件发送器,我们用这个来发送onNextonErroronComplete事件。

    //代码位置:io.reactivex.ObservableOnSubscribe.java
    public interface ObservableOnSubscribe<T> {
        void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    }
    

    接下来就要看ObservableCreate里面干了啥了。

    //代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    }
    

    what??? 仅仅是保存了我们传入的source。然后就没有然后了。

    所以说创建一个Observable的时候其实啥也没有发生,此时我们只有一个上游,没有下游,也没有连接上下游的管道,所以水流不通。我们已经知道下游是Observer了,那么管道是什么呢?

    对,就是这个subscribe()方法。

    注意注意,创建上游的时候我们返回的类型其实是ObservableCreate,从签名上可以看到这个类是Observable的子类。因为ObservableCreate没有重写父类的subscribe()方法,所以我们来看Observable中的subscribe()方法的实现。

    //代码位置:io.reactivex.Observable.java
    //已经化简,去掉了一些空检查等代码.
    public final void subscribe(Observer<? super T> observer) {
            try {
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
            }
        }
    

    核心代码就一行:subscribeActual(observer);

    这个实现,不用说,当然是ObservableCreate给的。

    //代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);//Point 1
        observer.onSubscribe(parent); //Point 2
        try {
            source.subscribe(parent);//Point 3
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    这段代码也很简洁。重点就3句话。

    先来讲Point 1 :创建了一个CreateEmitter,并把我们的下游(Observer)传入了。

    这个CreateEmitter是什么鬼?

    上文我们说到ObservableEmitter是事件发送器,然而他只是个接口,那么CreateEmitter是他的某个实现啦。

    上代码:

    //ObservableCreate的静态内部类。
    static final class CreateEmitter<T>  extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable{
         final Observer<? super T> observer;
    
         CreateEmitter(Observer<? super T> observer) {
          this.observer = observer;
         } 
    }
    

    创建CreateEmitter也只是把我们传入的Observer保存下来。

    接着讲Point 2:回调Observer.subscribe()方法,并把Point 1中创建的CreateEmitter传递出去。纳尼???Observer.subscribe()方法入参可是Disposable啊。果然这个CreateEmitter也实现了Disposable接口。所以说这个CreateEmitter既是事件发送器也是事件阶段器。这很好理解,最方便快捷的操作就是在事件发送的地方截断事件。

    最后Point 3: source.subscribe(parent). 啥??? source是啥,奥,还记得创建Observable的时候唯一做的事情就是保存了我们传入的ObservableOnSubscribe对象么?对,现在他给我们回调了。并且把一个事件发送器传递给了我们。这句话就回调到了我们用匿名内部类去实现的ObservableOnSubscribe接口。

    Observable
        .create(new ObservableOnSubscribe<String>() {//对对对,这个就是source
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("abc");//对对对,这个e就是parent,就是内部创建的CreateEmitter
                }
            })
    

    所以呐,拿着这个事件发送器我们就可以发送事件啦!

    上文的代码里只发送了一个事件:e.onNext("abc")

    最后我们就可以来看调用onNext,onComplete,onError事件分别会发生什么了。

    //代码位置:当然还是在CreateEmitter啦。因为调用的就是e.onNext("abc")嘛
    @Override
        public void onNext(T t) {
          if (t == null) { //Point 1
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
          }
          if (!isDisposed()) {//Point 2
                observer.onNext(t);//Point 3
          }
        }
    

    Point 1: 因为RxJava2不允许空对象出现啦,包括之前的代码里也有很多空检查的操作。这个要注意了。

    Point 2: 就是先检测一下事件有没有被切断,如果被切断了(isDisposed()返回true),当然不能往下传了。

    Point 3: 还记得我们创建CreateEmitter时,把我们外部创建的Observer传入了吗?对,就是这个observer。所以发现没,就这么调用到了Observer.onNext()

    没错,事件就这样从传递过来啦。

    可以想象,onError和onComplete事件也是类似的。

            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t); //point 1
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);// point 2
                    } finally {
                        dispose();  //point 3
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();//point 4
                    } finally {
                        dispose();//point 5
                    }
                }
            }
    
    
    

    onError比较特殊,就是如果事件流已经被切断,再次发送onError,会触发point 1处的代码(具体不用挂心,反正就是会抛一个异常,UndeliverableException)。当然onError()方法传递的Throwable也不能为空!!!

    Point 2 和 Point 4分别是回调外部Observer的onErroronComplete方法。这个回调跟onNext是一毛一样的。

    从Point 3和Point 5来看,一旦发生onError或者onComplete事件,RxJava内部就会把事件流切断。切断的机制我们暂时不讲。我们知道CreateEmitter也是一个事件切断器,所以他当然会有dispose()方法去切断事件,isDispose()方法判断事件有没有被切断。

    OK! 本文要讲的差不多了。最后附上一张类图,帮助理解和回忆。

    RxJava2.png

    ps:之所以我们有时候搞不清楚内部实现的逻辑,是因为对外暴露的永远都是接口,接口,接口,而内部的实现可能有很多其他角色在表演。

    相关文章

      网友评论

        本文标题:RxJava浅析——事件如何从上游传递到下游

        本文链接:https://www.haomeiwen.com/subject/sjcpbxtx.html