Observable概述(RxJava2)

作者: uqduiba | 来源:发表于2017-11-12 23:07 被阅读80次

    上一篇文章中定义了Rx = Observable + Operator + Scheduler。Rx以经典观察者模式为骨架、并扩展之使得我们能够以类似使用Iterable的方式使用Observable。

    Rx最为重要的两个要素是:数据流和异步(实际上Rx把数据流都视作异步的)。今天的主角便是数据流——Observable。根据上下文语义的需要,本系列文中可能另称之为数据序列事件流被观察者

    观察者

    在Rx的世界中,(几乎)每一个故事都从“观察者订阅了数据流”开始。观察者——Observer——好比哨兵,时刻监视着数据流的动静,一旦有数据发射或通知发送便立即响应。观察者实现了以下三个方法的子集:

    • onNext -- 当数据流发射流中任意一个数据时会调用观察者的onNext方法,并将发射的数据作为参数。
    • onError -- 当数据流产生数据失败或发生其他异常时会调用观察者的onError方法,并将失败原因(Throwable)作为参数。
    • onComplete -- 当数据流中的所有数据全部正常发射完会调用观察着的onComplete方法。

    当数据流调用观察者的onError/onComplete时,我们称它发送了错误/完成通知观察者只能收到来自某个数据流的一个通知,也就是说如果收到了流的错误通知,就不可能再收到该流的完成通知,反之亦然。一旦观察者收到了通知,便不能接收任何由该流发射的数据

    注:数据流可以发送多个通知,也可以在发送通知之后继续发送数据,只是观察者收到通知后就单方面把该流“拉黑”了而已。有时候为了实现一些特殊功能,我们不得不允许Observer不受限制地接收数据和通知(RxJava2的源码中也存在着这样的实现,比如:ObservableConcatMap.SourceObserver.InnerObserver就可以多次接收onComplete通知)。


    Rx编程模型

    我们先看一个常规的方法调用过程,程序会按照代码书写的顺序逐步地执行指令并返回结果,以同步的方式完成任务:

    1. 先调用某个方法。
    2. 把方法的返回值赋值给某个变量。
    3. 使用该变量执行后续指令以完成任务。

    在Rx中,数据流用于定义产生、处理数据的机制,一旦有观察者订阅(subscribe)了该流,其预定义的机制立即生效,观察者等待数据发射或通知发送并响应:

    1. 定义一个数据流,该流定义了一个异步操作,可以产生一个或多个数据。
    2. 定义一个观察者,并为它定义一个方法(onNext),该方法用来消费第一步的异步操作发射的数据。
    3. 观察者订阅数据流(于是故事开始了),数据流的异步操作被触发,然后生产发射数据,或发送通知(以结束整个故事)。

    如果程序需要完成多个不存在互相依赖的任务,由于Rx中指令可以异步并发地执行,我们可以同时启动多个任务,而不用依次地等待某个任务完成再启动下一个。


    Observable操作符

    掌握数据流和观察者之后,我们能比以前更好地处理数据序列(而不限于单个数据)。然而Rx真正的核武器是操作符Operator。我们先了解一下Rx有哪些操作符。

    • 创建型
      Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
    • 变换型
      Buffer, FlatMap, GroupBy, Map, Scan, Window
    • 过滤型
      Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
    • 组合型
      And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
    • 容错型
      Catch, Retry
    • 工具型
      Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
    • 条件型
      All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
    • 聚合型
      Average, Concat, Count, Max, Min, Reduce, Sum
    • 转换型
      To
    • 连接型
      Connect, Publish, RefCount, Replay
    • 背压型

    多数操作符仍然返回一个数据流,这种方式允许我们在程序中链式地对数据流调用操作符——联想一下builder(构建者)模式的链式调用——与builder模式不同的是,Observable的操作符返回了一个新Observable,这个新Observable原Observable的代理。
    应用了操作符后,单单用“数据流”已经无法准确描述Observable的含义。我增加“原始流”、“上游”和“下游”以及“支流(流中流)”来区分不同意义的Observable。“数据流”是Observable的泛称。

    • 原始流——全称为“原始数据流”,指代由创建型操作符返回的Observable
    • 上游和下游——二者必须成对地出现。对Observable调用非创建型操作符后,“上游”指代原Observable,“下游”指代返回的新Observable
    • 支流或流中流——仅仅在应用FlatMapConcatMap操作符的场景中使用这一称谓。“支流”指代这两个操作符的mapper返回的子Observable,“支流”亦称“流中流”。
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    

    flatMap的方法签名分析,它接受一个mapper参数,此mapper将上游中的数据变换成一个ObservableSource新Observable中的数据是ObservableSource类型——颇有子Observable的味道——这不就是“流中流”(“流中流”自带解释功能,理解之后还是叫“支流”比较自然)吗?
    需要注意的是:最终Rx将整合子Observable(支流)中的所有数据而不是子Observable本身汇入下游。后面的文章会详细地对FlatMapConcatMap进行源码分析。


    RxJava2#Observable类(源码基于v2.1.5)

    Observable是一个抽象类,实现了ObservableSource(void subscribe())接口。该类有且仅有一个抽象方法subscribeActual,其他非privateprivate方法也就3个)方法要么是static的,要么是final的。这意味着定义自己的ObservableCustom是件非常简单的事情,Observable类已经完成了99.99%的工作,我们只需要override subscribeActual方法就够了。

    Observable所有的创建型操作符都是静态的,比如Just:

    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 通过RxJavaPlugins的setters可以在运行时改变默认的行为
        // 如果程序中没有调用RxJavaPlugins.setOnObservableAssembly(xxx),下面一行代码跟其后一行注释完全等效
        return RxJavaPlugins.onAssembly(new ObservableJust(item));
        // return new ObservableJust(item); 
    }
    

    我们可以看到Just操作符本质上构造了一个ObservableJust对象。RxJava2内置了大量的ObservableXXXXXX往往是操作符的名字比如Just)。

    再来看一个非创建型的操作符Map:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
    }
    

    以及ObservableMap类的核心代码:

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        public void subscribeActual(Observer<? super U> t) {
            this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
        }
    }
    

    重点看ObservableMap构造方法,它接收ObservableSource类型的对象作为第一个参数——回忆一下代理模式——创建了原Observable的代理,也就是新ObservableMap实例。
    RxJava2中大量运用了代理模式,细心的你或许已经发现:在subscribeActual方法中,还创建了一个原Observer的代理——MapObserver的实例。


    本系列将持续更新

    相关文章

      网友评论

      • mudcastles:你好,我使用rejava+retrofit,在apiService里面想要返回Observable<T>,我该怎么做?似乎不能直接这么做,但是网络返回的数据不统一,我需要动态确定T的类型
        mudcastles:Rxjava,不好意思,打成了rejava

      本文标题:Observable概述(RxJava2)

      本文链接:https://www.haomeiwen.com/subject/fadomxtx.html