美文网首页
RxJava2 源码二:链式的秘密

RxJava2 源码二:链式的秘密

作者: MxsQ | 来源:发表于2018-09-10 15:25 被阅读0次

    前言

    上一篇文章,对Rx进行了梳理,大致了解了什么是Rx,并对RxJava的基本流程进行了跟踪,如忘记其中关键点,及时回顾。

    传送门

    关于链式

    从Rx的机制上来说,事件的持续进行运转于调用链之上,也就是说,调用链上的每一环节,承载了各自的任务,并对事件的最终完成或异常状态提供了对应出口。那分门别类的任务,是如何嵌入调用链中,并得以完成的呢? 此篇文章,将对此进行解析。

    链式

    回顾

    在解析之前,先对基本流程进行回顾:

    1. 拿到Observable
    2. Observable.subscribe()与观察者签订
    3. subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
    4. 数据或事件到来,观察者进行响应

    案例

    Observable<Integer> ob = Observable.fromArray(new Integer[]{1,2,3,4,5,6});  // 数据
    ob.filter(new Predicate<Integer>() { // 过滤
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer % 2 == 0;
        }
    }).map(new Function<Integer, String>() { // 数据转换
        @Override
        public String apply(Integer integer) throws Exception {
            return integer.toString();
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }
    
        @Override
        public void onNext(String s) { // 响应
            Log.d(TAG, "onNext: " + s);
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onComplete() { // 响应
            Log.d(TAG, "onComplete: ");
        }
    });
    

    从上游向下游发射了6个数据,过滤掉不满足过滤条件 integer % 2 == 0 的数据,并将过滤后的数据转换为String类型,下游最终拿到3个数据,进行了4次响应。
    这一调用链中,进行了两个额外任务——过滤、转换,但却能保证最终到达下游,那,如何完成?

    上游的构建

    从基本流程看,第一步,拿到Observable。从每次调用的返回看,包括Observable.fromArray(),Observable.filter(),Observable.map()均会拿到Observable,但,Observable会有不同。分别进行跟踪,在当前情况下,拿到的Observable依次如下:

    // Observable.filter() 
    public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }
    
    // Observable.map()
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    
    // Observable.fromArray()
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    
    Observable.filter()    
    public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
        super(source);
        this.predicate = predicate;
    }
    
    Observable.map()
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    

    除了fromArray()外,均接收一个ObservableSource参数,并返回一个Observable。而接收到的ObservableSource,就是调用当前接口的Observable , 显然,Observable被装饰了,并且是层层装饰。 注意到,在Rx提供的类似fromArray(),比如前文的Observable.create()的一系列方法,接收参数并不遵守这一规则。原因是,这一系列接口所提供的Observable所处位置比较特别,这类Observable处于事件的的发源地,作为接收数据的源头,而数据的来源方式,千差万别,因此这类Observable的构造方式也大不相同。

    这里需插播一个信息,即Observable.map()与Observable.filter()这一类操作所拿到的Observable被标示为AbstractObservableWithUpstream,取map()来看,如下

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
    
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
    

    AbstractObservableWithUpstream的作用在于,指明此类Observable,将会对所包装的Observable进行额外处理。 其实,Observer也会有相应的行为,下面再谈。

    回到当前,说到经过一系列的调用之后,形成了一条调用链,而拿到的Observable是被层层包装的,因此,实际拿到的是一条包装链,如下图:


    上游链条.jpg

    案例中,到Observable.map()为止,上游链条建立完毕,由外至内为ObservableMap -> ObservableFilter -> ObservableFromArray。紧接着,与观察者签订。

    下游的构建

    与上游对应的,其实在签订之后,下游也会形成链条的。
    再次回忆之前所说的基本流程:

    1. 拿到Observable
    2. Observable.subscribe()与观察者签订
    3. subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
    4. 数据或事件到来,观察者进行响应

    此刻,拿到了最外层为ObservableMap的上游链条(最后调用为Observable.map()),因此,实际与案例里定义的观察者签订的Observable为ObservableMap。在第一篇讲解中说过,subscribe()会将Observer交给subscribeActual(),由subscribeActual()来执行具体的逻辑。

    // map()拿到的ObservableMap
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
    // filter()拿到的ObservableFilter
    @Override
    public void subscribeActual(Observer<? super T> s) {
        source.subscribe(new FilterObserver<T>(s, predicate));
    }
    
    // fromArray()拿到的ObservableFromArray
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        if (d.fusionMode) {
            return;
        }
        d.run();
    }
    

    可以看到,从最外层的Observable开始,不断地执行subscribe() -> subscribeActual() -> subscribe() -> subscribeActual() ... 直到达到最内层Observable的 subscribeActual()。 期间通过subscribe()交接的Observer也经历了对应的包装变化。Observer的包装次序为 自定义的Observer <- MapObserver <- FilterObserver 。 除最内层的Observer外,其他的Observer均以另一个Observer为构造参数,以便装饰,可通过跟踪任意Observer源码看出,此不贴出。

    当前,下游也形成了一条调用链。上下游的调用链如图:


    运行时调用链.jpg

    在到达最内层的Observable之后,Observer链构建完毕,并且最内的Observable将Observer链的最外层以Disposable形式交给API的使用者, 以上执行此承操作的代码为
    s.onSubscribe(d)。

    以上上下游链条的构建过程需要仔细琢磨,很重要。

    事件的处理

    到此处为止,RxJava的调用链式构造完毕了的。那么,为什么要这样做呢?

    一句话回答:“处理差异性需求。”

    在Rx中,创建、过滤、变换、结合、辅助操作等,提供的以操作符形式的让得以以链条形式完成一系列工作的行为,均是为了处理差异性需求,在本文案例中表现为fromArray(),filter(),map()。而对于种种的差异性需求来说,先以上游或下游为大环境,再以其中某一节点为具体场景进行处理。因此需要分别在上下游分别建立相应的调用链,并构建出场景明确的链条节点,以此达到Rx完成需求的目的。

    在有了双链并确定了节点场景之后,处理差异性需求就简单了,仅需在当前场景处理响应事件之后,让事件继续流动向下一节点皆可。如图


    节点需求.jpg

    当事件或数据来到当前节点时,一般会遵循如图顺序流动。

    回到案例。当前情况,已在上下游构建好相应的调用链,而在最内的Observable里,发射了数据

    // ObservableFromArray
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
        s.onSubscribe(d);
    
        if (d.fusionMode) {
            return;
        }
        // 发射数据
        d.run();
    }
    
    // FromArrayDisposable
    void run() {
        T[] a = array; // 要发射的数据
        int n = a.length;
    
        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[I];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            // actual为最外的Observer,为FilterObserver
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
    

    在数据开始推送后,下游链的每个节点,依次接收到数据事件的到来,并进行相应的处理。根据之前的分析,当前案例的下游链为自定义的FilterObserver -> MapObserver -> 自定义的Observer 。 对应代码如下

    // FilterObserver
    @Override
    public void onNext(T t) {
        if (sourceMode == NONE) {
           ......
            if (b) {
                // 将满足过滤条件的数据发送下一节点,为MapObserver
                actual.onNext(t);
            }
        } else {
            actual.onNext(null);
    }
    
    // MapObserver
    @Override
    public void onNext(T t) {
        ......
        try {
            // 将数据t进行转换
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        // 将数据v发送给下一节点,为自定义的Observer
        actual.onNext(v);
    }
    

    当事件到达调用链的某一节点,此节点将进行对应的处理,并根据需求决定事件应如何收发,而在最终的节点里,将会收到经历了所有变迁的事件。

    注意到,在Observable.fromArray()、Observable.filter()、Observable.map()等操作符里,需要携带对应的参数,如Function、Predicate等。此类参数的作用为,携带链上节点所需信息,协助构建节点或协助处理事件,但不为本文重点,故不提及。

    总结

    Rx中,为了在链式表达中完整一系列事件需求,分别在上下游构建了对应的调用链。

    • 上游 : 对于上游来说,链的形成通过相应操作符的操作构建
    • 下游 : 对于下游来说,链通过Observable.subscribe()与Observable.subscribeActual()构建。

    事件到来时,节点将会进行响应,并根据自身特点在合适的时机将事件推送给下一节点。通过上下游调用链的运行机制,能让Rx得以以链条不断裂的前提下,完成复杂的事件交互,将事件的流程主干铺陈出来,剥离异常与分支流程。而Rx可插拔的运作方式也体现在节点的删减中。

    顺带提一句,节点拥有处理事件然后推送给下一节点的过程,而其中经历的时间与具体的处理是变化无穷,因此,节点是多维度的,仔细体会,自行想象。


    下一篇 :线程调度

    相关文章

      网友评论

          本文标题:RxJava2 源码二:链式的秘密

          本文链接:https://www.haomeiwen.com/subject/njpsgftx.html