上一篇文章中定义了Rx = Observable + Operator + Scheduler。Rx以经典观察者模式为骨架、并扩展之使得我们能够以类似使用Iterable的方式使用Observable。
Rx最为重要的两个要素是:数据流和异步(实际上Rx把数据流都视作异步的)。今天的主角便是数据流——Observable。根据上下文语义的需要,本系列文中可能另称之为数据序列、事件流、被观察者。
观察者
在Rx的世界中,(几乎)每一个故事都从“观察者订阅了数据流”开始。观察者——Observer——好比哨兵,时刻监视着数据流的动静,一旦有数据发射或通知发送便立即响应。观察者实现了以下三个方法的子集:
-
onNext
-- 当数据流发射流中任意一个数据时会调用观察者的onNext
方法,并将发射的数据作为参数。 -
onError
-- 当数据流产生数据失败或发生其他异常时会调用观察者的onError
方法,并将失败原因(Throwable
)作为参数。 -
onComplete
-- 当数据流中的所有数据全部正常发射完会调用观察着的onComplete
方法。
当数据流调用观察者的onError/onComplete时,我们称它发送了错误/完成通知。观察者只能收到来自某个数据流的一个通知,也就是说如果收到了流的错误通知,就不可能再收到该流的完成通知,反之亦然。一旦观察者收到了通知,便不能接收任何由该流发射的数据。
注:数据流可以发送多个通知,也可以在发送通知之后继续发送数据,只是观察者收到通知后就单方面把该流“拉黑”了而已。有时候为了实现一些特殊功能,我们不得不允许Observer
不受限制地接收数据和通知(RxJava2的源码中也存在着这样的实现,比如:ObservableConcatMap.SourceObserver.InnerObserver
就可以多次接收onComplete
通知)。
Rx编程模型
我们先看一个常规的方法调用过程,程序会按照代码书写的顺序逐步地执行指令并返回结果,以同步的方式完成任务:
- 先调用某个方法。
- 把方法的返回值赋值给某个变量。
- 使用该变量执行后续指令以完成任务。
在Rx中,数据流用于定义产生、处理数据的机制,一旦有观察者订阅(subscribe
)了该流,其预定义的机制立即生效,观察者等待数据发射或通知发送并响应:
- 定义一个数据流,该流定义了一个异步操作,可以产生一个或多个数据。
- 定义一个观察者,并为它定义一个方法(
onNext
),该方法用来消费第一步的异步操作发射的数据。 - 观察者订阅数据流(于是故事开始了),数据流的异步操作被触发,然后生产发射数据,或发送通知(以结束整个故事)。
如果程序需要完成多个不存在互相依赖的任务,由于Rx中指令可以异步并发地执行,我们可以同时启动多个任务,而不用依次地等待某个任务完成再启动下一个。
Observable操作符
掌握数据流和观察者之后,我们能比以前更好地处理数据序列(而不限于单个数据)。然而Rx真正的核武器是操作符Operator。我们先了解一下Rx有哪些操作符。
-
创建型
Create
,Defer
,Empty
/Never
/Throw
,From
,Interval
,Just
,Range
,Repeat
,Start
,Timer
-
变换型
Buffer
,FlatMap
,GroupBy
,Map
,Scan
,Window
-
过滤型
Debounce
,Distinct
,ElementAt
,Filter
,First
,IgnoreElements
,Last
,Sample
,Skip
,SkipLast
,Take
,TakeLast
-
组合型
And
/Then
/When
,CombineLatest
,Join
,Merge
,StartWith
,Switch
,Zip
-
容错型
Catch
,Retry
-
工具型
Delay
,Do
,Materialize
/Dematerialize
,ObserveOn
,Serialize
,Subscribe
,SubscribeOn
,TimeInterval
,Timeout
,Timestamp
,Using
-
条件型
All
,Amb
,Contains
,DefaultIfEmpty
,SequenceEqual
,SkipUntil
,SkipWhile
,TakeUntil
,TakeWhile
-
聚合型
Average
,Concat
,Count
,Max
,Min
,Reduce
,Sum
-
转换型
To
-
连接型
Connect
,Publish
,RefCount
,Replay
- 背压型
多数操作符仍然返回一个数据流,这种方式允许我们在程序中链式地对数据流调用操作符——联想一下builder(构建者)模式的链式调用——与builder模式不同的是,Observable
的操作符返回了一个新Observable
,这个新Observable
是原Observable
的代理。
应用了操作符后,单单用“数据流”已经无法准确描述Observable
的含义。我增加“原始流”、“上游”和“下游”以及“支流(流中流)”来区分不同意义的Observable
。“数据流”是Observable
的泛称。
- 原始流——全称为“原始数据流”,指代由创建型操作符返回的
Observable
。 - 上游和下游——二者必须成对地出现。对
Observable
调用非创建型操作符后,“上游”指代原Observable
,“下游”指代返回的新Observable
。 - 支流或流中流——仅仅在应用
FlatMap
和ConcatMap
操作符的场景中使用这一称谓。“支流”指代这两个操作符的mapper
返回的子Observable
,“支流”亦称“流中流”。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
从flatMap
的方法签名分析,它接受一个mapper参数,此mapper将上游中的数据变换成一个ObservableSource
,新Observable
中的数据是ObservableSource
类型——颇有子Observable
的味道——这不就是“流中流”(“流中流”自带解释功能,理解之后还是叫“支流”比较自然)吗?
需要注意的是:最终Rx将整合子Observable
(支流)中的所有数据而不是子Observable
本身汇入下游。后面的文章会详细地对FlatMap
和ConcatMap
进行源码分析。
RxJava2#Observable类(源码基于v2.1.5)
Observable
是一个抽象类,实现了ObservableSource(void subscribe())
接口。该类有且仅有一个抽象方法subscribeActual
,其他非private
(private
方法也就3个)方法要么是static
的,要么是final
的。这意味着定义自己的ObservableCustom
是件非常简单的事情,Observable
类已经完成了99.99%的工作,我们只需要override
subscribeActual
方法就够了。
Observable
所有的创建型操作符都是静态的,比如Just:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 通过RxJavaPlugins的setters可以在运行时改变默认的行为
// 如果程序中没有调用RxJavaPlugins.setOnObservableAssembly(xxx),下面一行代码跟其后一行注释完全等效
return RxJavaPlugins.onAssembly(new ObservableJust(item));
// return new ObservableJust(item);
}
我们可以看到Just
操作符本质上构造了一个ObservableJust
对象。RxJava2内置了大量的ObservableXXX
(XXX
往往是操作符的名字比如Just
)。
再来看一个非创建型的操作符Map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}
以及ObservableMap
类的核心代码:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}
}
重点看ObservableMap
构造方法,它接收ObservableSource
类型的对象作为第一个参数——回忆一下代理模式——创建了原Observable
的代理,也就是新ObservableMap
实例。
RxJava2中大量运用了代理模式,细心的你或许已经发现:在subscribeActual
方法中,还创建了一个原Observer
的代理——MapObserver
的实例。
网友评论