Amb
给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据
amb当你传递多个Observable给Amb
时,它只发射其中一个Observable的数据和通知:首先发送通知给Amb
的那个,不管发射的是一项数据还是一个onError
或onCompleted
通知。Amb
将忽略和丢弃其它所有Observables的发射物。
RxJava的实现是amb
,有一个类似的对象方法ambWith
。例如,Observable.amb(o1,o2)
和o1.ambWith(o2)
是等价的。
这个操作符默认不在任何特定的调度器上执行。
Contains
判定一个Observable是否发射一个特定的值
contains给Contains
传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false。
相关的一个操作符IsEmpty
用于判定原始Observable是否没有发射任何数据。
contains
默认不在任何特定的调度器上执行。
- Javadoc: contains(Object)
RxJava中还有一个exists
操作符,它通过一个谓词函数测试原始Observable发射的数据,只要任何一项满足条件就返回一个发射true的Observable,否则返回一个发射false的Observable。
exists
默认不在任何特定的调度器上执行。
- Javadoc: exists(Func1)
isEmpty
默认不在任何特定的调度器上执行。
- Javadoc: isEmpty()
DefaultIfEmpty
发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值
defaultIfEmtpyDefaultIfEmpty
简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompleted
d的形式),DefaultIfEmpty
返回的Observable就发射一个你提供的默认值。
RxJava将这个操作符实现为defaultIfEmpty
。它默认不在任何特定的调度器上执行。
- Javadoc: defaultIfEmpty(T)
还有一个新的操作符switchIfEmpty
,不在RxJava 1.0.0版中,它和defaultIfEmtpy
类似,不同的是,如果原始Observable没有发射数据,它发射一个备用Observable的发射物。
SequenceEqual
判定两个Observables是否发射相同的数据序列。
sequenceEqual传递两个Observable给SequenceEqual
操作符,它会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。
它还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。
这个操作符默认不在任何特定的调度器上执行。
SkipUntil
丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据
skipUntilSkipUntil
订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。
RxJava中对应的是skipUntil
,它默认不在任何特定的调度器上执行。
- Javadoc: skipUntil(Observable)
SkipWhile
丢弃Observable发射的数据,直到一个指定的条件不成立
skipWhileSkipWhile
订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。
skipWhile
默认不在任何特定的调度器上执行。
- Javadoc: skipWhile(Func1)
TakeUntil
当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据
takeUntilTakeUntil
订阅并开始发射原始Observable,它还监视你提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,TakeUntil
返回的Observable会停止发射原始Observable并终止。
RxJava中的实现是takeUntil
。注意:第二个Observable发射一项数据或一个onError
通知或一个onCompleted
通知都会导致takeUntil
停止发射数据。
takeUntil
默认不在任何特定的调度器上执行。
- Javadoc: takeUntil(Observable)
还有一个版本的takeUntil
,不在RxJava 1.0.0版中,它使用一个谓词函数而不是第二个Observable来判定是否需要终止发射数据,它的行为类似于takeWhile
。
- Javadoc: takeUntil(Func1)
TakeWhile
发射Observable发射的数据,直到一个指定的条件不成立
takeWhileTakeWhile
发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。
RxJava中的takeWhile
操作符返回一个镜像原始Observable行为的Observable,直到某一项数据你指定的函数返回false
那一刻,这个新的Observable发射onCompleted
终止通知。
takeWhile
默认不在任何特定的调度器上执行。
- Javadoc: takeWhile(Func1)
算术和聚合操作
本页展示的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。
rxjava-math
模块的操作符
- averageInteger( ) — 求序列平均数并发射
- averageLong( ) — 求序列平均数并发射
- averageFloat( ) — 求序列平均数并发射
- averageDouble( ) — 求序列平均数并发射
- max( ) — 求序列最大值并发射
- maxBy( ) — 求最大key对应的值并发射
- min( ) — 求最小值并发射
- minBy( ) — 求最小Key对应的值并发射
- sumInteger( ) — 求和并发射
- sumLong( ) — 求和并发射
- sumFloat( ) — 求和并发射
- sumDouble( ) — 求和并发射
其它聚合操作符
- concat( ) — 顺序连接多个Observables
- count( ) and countLong( ) — 计算数据项的个数并发射结果
- reduce( ) — 对序列使用reduce()函数并发射最终的结果
- collect( ) — 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
- toList( ) — 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
- toSortedList( ) — 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
- toMap( ) — 将序列数据转换为一个Map,Map的key是根据一个函数计算的
- toMultiMap( ) — 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的
算术和聚合操作
Average
计算原始Observable发射数字的平均值并发射它
averageAverage
操作符操作符一个发射数字的Observable,并发射单个值:原始Observable发射的数字序列的平均值。
这个操作符不包含在RxJava核心模块中,它属于不同的rxjava-math
模块。它被实现为四个操作符:averageDouble
, averageFloat
, averageInteger
, averageLong
。
如果原始Observable不发射任何数据,这个操作符会抛异常:IllegalArgumentException
。
Min
发射原始Observable的最小值
minMin
操作符操作一个发射数值的Observable并发射单个值:最小的那个值。
RxJava中,min
属于rxjava-math
模块。
min
接受一个可选参数,用于比较两项数据的大小,如果最小值的数据超过一项,min
会发射原始Observable最近发射的那一项。
minBy
类似于min
,但是它发射的不是最小值,而是发射Key最小的项,Key由你指定的一个函数生成。
Max
发射原始Observable的最大值
maxMax
操作符操作一个发射数值的Observable并发射单个值:最大的那个值。
RxJava中,max
属于rxjava-math
模块。
max
接受一个可选参数,用于比较两项数据的大小,如果最大值的数据超过一项,max
会发射原始Observable最近发射的那一项。
maxBy
类似于max
,但是它发射的不是最大值,而是发射Key最大的项,Key由你指定的一个函数生成。
Count
计算原始Observable发射物的数量,然后只发射这个值
countCount
操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量。
如果原始Observable发生错误终止,Count
不发射数据而是直接传递错误通知。如果原始Observable永远不终止,Count
既不会发射数据也不会终止。
RxJava的实现是count
和countLong
。
示例代码
String[] items = new String[] { "one", "two", "three" };
assertEquals( new Integer(3), Observable.from(items).count().toBlocking().single() );
- Javadoc: count()
- Javadoc: countLong()
Sum
计算Observable发射的数值的和并发射这个和
sumSum
操作符操作一个发射数值的Observable,仅发射单个值:原始Observable所有数值的和。
RxJava的实现是sumDouble
, sumFloat
, sumInteger
, sumLong
,它们不是RxJava核心模块的一部分,属于rxjava-math
模块。
你可以使用一个函数,计算Observable每一项数据的函数返回值的和。
在StringObservable
类(这个类不是RxJava核心模块的一部分)中有一个stringConcat
操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串的连接。
StringObservable
类还有一个join
操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串以你指定的分界符连接的结果。
Concat
不交错的发射两个或多个Observable的发射物
concatConcat
操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。
直到前面一个Observable终止,Concat
才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个"热"Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),Concat
将不会看到也不会发射它之前发射的任何数据。
在ReactiveX的某些实现中有一种ConcatMap
操作符(名字可能叫concat_all
, concat_map
, concatMapObserver
, for
, forIn/for_in
, mapcat
, selectConcat
或selectConcatObserver
),他会变换原始Observable发射的数据到一个对应的Observable,然后再按观察和变换的顺序进行连接操作。
StartWith
操作符类似于Concat
,但是它是插入到前面,而不是追加那些Observable的数据到原始Observable发射的数据序列。
Merge
操作符也差不多,它结合两个或多个Observable的发射物,但是数据可能交错,而Concat
不会让多个Observable的发射物交错。
RxJava中的实现叫concat
。
- Javadoc: concat(Observable)
- Javadoc: concat(Observable,Observable)
还有一个实例方法叫concatWith
,这两者是等价的:Observable.concat(a,b)
和a.concatWith(b)
。
Reduce
按顺序对Observable发射的每项数据应用一个函数并发射最终的值
reduceReduce
操作符对原始Observable发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce
返回的Observable发射这个函数返回的最终值。
在其它场景中,这种操作有时被称为累积
,聚集
,压缩
,折叠
,注射
等。
注意如果原始Observable没有发射任何数据,reduce
抛出异常IllegalArgumentException
。
reduce
默认不在任何特定的调度器上执行。
- Javadoc: reduce(Func2)
还有一个版本的reduce
额外接受一个种子参数。注意传递一个值为null
的种子是合法的,但是与不传种子参数的行为是不同的。如果你传递了种子参数,并且原始Observable没有发射任何数据,reduce
操作符将发射这个种子值然后正常终止,而不是抛异常。
- Javadoc: reduce(R,Func2)
提示:不建议使用reduce
收集发射的数据到一个可变的数据结构,那种场景你应该使用collect
。
collect
与reduce
类似,但它的目的是收集原始Observable发射的所有数据到一个可变的数据结构,collect
生成的这个Observable会发射这项数据。它需要两个参数:
- 一个函数返回可变数据结构
- 另一个函数,当传递给它这个数据结构和原始Observable发射的数据项时,适当地修改数据结构。
collect
默认不在任何特定的调度器上执行。
- Javadoc: collect(Func0,Action2)
异步操作
下面的这些操作符属于单独的rxjava-async
模块,它们用于将同步对象转换为Observable。
- start( ) — 创建一个Observable,它发射一个函数的返回值
- toAsync( ) or asyncAction( ) or asyncFunc( ) — 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值
- startFuture( ) — 将一个返回Future的函数转换为一个Observable,它发射Future的返回值
- deferFuture( ) — 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它
- forEachFuture( ) — 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成
- fromAction( ) — 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值
- fromCallable( ) — 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常
- fromRunnable( ) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值
- runAsync( ) — 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions
连接操作
这一节解释ConnectableObservable
和它的子类以及它们的操作符:
- ConnectableObservable.connect( ) — 指示一个可连接的Observable开始发射数据
- Observable.publish( ) — 将一个Observable转换为一个可连接的Observable
- Observable.replay( ) — 确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
- ConnectableObservable.refCount( ) — 让一个可连接的Observable表现得像一个普通的Observable
一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()
被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代码展示了两个订阅者订阅同一个Observable的情况。第一种情形,它们订阅一个普通的Observable;第二种情形,它们订阅一个可连接的Observable,并且在两个都订阅后再连接。注意输出的不同:
示例 #1:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete
示例 #2:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete
Connect
让一个可连接的Observable开始发射数据给订阅者
connect可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。
RxJava中connect
是ConnectableObservable
接口的一个方法,使用publish
操作符可以将一个普通的Observable转换为一个ConnectableObservable
。
调用ConnectableObservable
的connect
方法会让它后面的Observable开始给发射数据给订阅者。
connect
方法返回一个Subscription
对象,可以调用它的unsubscribe
方法让Observable停止发射数据给观察者。
即使没有任何订阅者订阅它,你也可以使用connect
方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你可以将一个"冷"的Observable变为"热"的。
- Javadoc: connect()
- Javadoc: connect(Action1)
Publish
将普通的Observable转换为可连接的Observable
publish可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
RxJava的实现为publish
。
- Javadoc: publish()
有一个变体接受一个函数作为参数。这个函数用原始Observable发射的数据作为参数,产生一个新的数据作为ConnectableObservable
给发射,替换原位置的数据项。实质是在签名的基础上添加一个Map
操作。
- Javadoc: publish(Func1)
RefCount
让一个可连接的Observable行为像普通的Observable
refCount可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
RefCount
操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount
连接到下层的可连接Observable。RefCount
跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。
RxJava中的实现为refCount
,还有一个操作符叫share
,它的作用等价于对一个Observable同时应用publish
和refCount
操作。
- Javadoc: refCount()
- Javadoc: share()
Replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
replay可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
如果在将一个Observable转换为可连接的Observable之前对它使用Replay
操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。
RxJava的实现为replay
,它有多个接受不同参数的变体,有的可以指定replay
的最大缓存数量,有的还可以指定调度器。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一种 replay
返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是replay
变换之后的数据项。
- Javadoc: replay(Func1)
- Javadoc: replay(Func1,int)
- Javadoc: replay(Func1,long,TimeUnit)
- Javadoc: replay(Func1,int,long,TimeUnit)
实现自己的操作符
你可以实现你自己的Observable操作符,本文展示怎么做。
如果你的操作符是被用于创造一个Observable,而不是变换或者响应一个Observable,使用 create( )
方法,不要试图手动实现 Observable
。另外,你可以按照下面的用法说明创建一个自定义的操作符。
如果你的操作符是用于Observable发射的单独的数据项,按照下面的说明做:Sequence Operators 。如果你的操作符是用于变换Observable发射的整个数据序列,按照这个说明做:Transformational Operators 。
提示: 在一个类似于Groovy的语言Xtend中,你可以以 extension methods 的方式实现你自己的操作符 ,不使用本文的方法,它们也可以链式调用。详情参见 RxJava and Xtend
序列操作符
下面的例子向你展示了怎样使用lift( )
操作符将你的自定义操作符(在这个例子中是 myOperator
)与标准的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与lift()
搭配使用。
实现你的操作符
将你的自定义操作符定义为实现了 Operator
接口的一个公开类, 就像这样:
public class MyOperator<T> implements Operator<T> {
public MyOperator( /* any necessary params here */ ) {
/* 这里添加必要的初始化代码 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* 这里添加你自己的onCompleted行为,或者仅仅传递完成通知: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* 这里添加你自己的onError行为, 或者仅仅传递错误通知:*/
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 这个例子对结果的每一项执行排序操作,然后返回这个结果 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
变换操作符
下面的例子向你展示了怎样使用 compose( )
操作符将你得自定义操作符(在这个例子中,是一个名叫myTransformer
的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与compose()
搭配使用。
实现你的变换器
将你的自定义操作符定义为实现了 Transformer
接口的一个公开类,就像这样:
public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
public MyTransformer( /* any necessary params here */ ) {
/* 这里添加必要的初始化代码 */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* 这个简单的例子Transformer应用一个map操作,
* 这个map操作将发射整数变换为发射整数的字符串表示。
*/
return source.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
参见
其它需要考虑的
-
在发射任何数据(或者通知)给订阅者之前,你的序列操作符可能需要检查它的
Subscriber.isUnsubscribed( )
状态,如果没有订阅者了,没必要浪费时间生成数据项。 -
请注意:你的序列操作符必须复合Observable协议的核心原则:
- 它可能调用订阅者的
onNext( )
方法任意次,但是这些调用必须是不重叠的。 - 它只能调用订阅者的
onCompleted( )
或onError( )
正好一次,但不能都调用,而且不能在这之后调用订阅者的onNext( )
方法。 - 如果你不能保证你得操作符遵从这两个原则,你可以给它添加
serialize( )
操作符,它会强制保持正确的行为。
- 它可能调用订阅者的
-
请关注这里 Issue #1962 &mdash;需要有一个计划创建一个测试脚手架,你可以用它来写测试验证你的新操作符遵从了Observable协议。
-
不要让你的操作符阻塞别的操作。
-
When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
-
如果可能,你应该组合现有的操作符创建你的新操作符,而不是从零开始实现它。RxJava自身的标准操作符也是这样做的,例如:
-
first( )
被定义为take(1).single( )
-
ignoreElements( )
被定义为filter(alwaysFalse( ))
-
reduce(a)
被定义为scan(a).last( )
-
-
如果你的操作符使用了函数或者lambda表达式作为参数,请注意它们可能是异常的来源,而且要准备好捕获这些异常,并且使用
onError()
通知订阅者。
-
某些异常被认为是致命的,对它们来说,调用
onError()
毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用Exceptions.throwIfFatal(throwable)
方法过滤掉这些致命的异常,并重新抛出它们,而不是试图发射关于它们的通知。 -
一般说来,一旦发生错误应立即通知订阅者,而不是首先尝试发射更多的数据。
-
请注意
null
可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非null
值作为是否发射了数据的替代。一个值为null
的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。 -
想让你的操作符在反压(backpressure)场景中变得得好可能会非常棘手。可以参考Dávid Karnok的博客 Advanced RxJava,这里有一个涉及到的各种因素和怎样处理它们的很值得看的讨论。
插件让你可以用多种方式修改RxJava的默认行为:
- 修改默认的计算、IO和新线程调度器集合
- 为RxJava可能遇到的特殊错误注册一个错误处理器
- 注册一个函数记录一些常规RxJava活动的发生
RxJavaSchedulersHook
这个插件让你可以使用你选择的调度器覆盖默认的计算、IO和新线程调度 (Scheduler
),要做到这些,需要继承 RxJavaSchedulersHook
类并覆写这些方法:
Scheduler getComputationScheduler( )
Scheduler getIOScheduler( )
Scheduler getNewThreadScheduler( )
Action0 onSchedule(action)
然后是下面这些步骤:
- 创建一个你实现的
RxJavaSchedulersHook
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的默认调度器对象传递给
RxJavaPlugins
的registerSchedulersHook( )
方法。
完成这些后,RxJava会开始使用你的方法返回的调度器,而不是内置的默认调度器。
RxJavaErrorHandler
这个插件让你可以注册一个函数处理传递给 Subscriber.onError(Throwable)
的错误。要做到这一点,需要继承 RxJavaErrorHandler
类并覆写这个方法:
void handleError(Throwable e)
然后是下面这些步骤:
- 创建一个你实现的
RxJavaErrorHandler
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的错误处理器对象传递给
RxJavaPlugins
的registerErrorHandler( )
方法。
完成这些后,RxJava会开始使用你的错误处理器处理传递给 Subscriber.onError(Throwable)
的错误。
RxJavaObservableExecutionHook
这个插件让你可以注册一个函数用于记录日志或者性能数据收集,RxJava在某些常规活动时会调用它。要做到这一点,需要继承 RxJavaObservableExecutionHook
类并覆写这些方法:
方法 | 何时调用 |
---|---|
onCreate( ) |
在 Observable.create( ) 方法中 |
onSubscribeStart( ) |
在 Observable.subscribe( ) 之前立刻 |
onSubscribeReturn( ) |
在 Observable.subscribe( ) 之后立刻 |
onSubscribeError( ) |
在Observable.subscribe( ) 执行失败时 |
onLift( ) |
在Observable.lift( ) 方法中 |
然后是下面这些步骤:
- 创建一个你实现的
RxJavaObservableExecutionHook
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的Hook对象传递给
RxJavaPlugins
的registerObservableExecutionHook( )
方法。
When you do this, RxJava will begin to call your functions when it encounters the specific conditions they were designed to take note of. 完成这些后,在满足某些特殊的条件时,RxJava会开始调用你的方法。
背压问题
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
简而言之,背压是流速控制的一种策略。
需要强调两点:
- 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
- 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
响应式拉取(reactive pull)
首先我们回忆之前那篇《关于Rxjava最友好的文章》,里面其实提到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。
结构示意图如下:
[图片上传失败...(image-9976a2-1629772337103)]
观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。
源码
public class FlowableOnBackpressureBufferStategy{
...
@Override
public void onNext(T t) {
if (done) {
return;
}
boolean callOnOverflow = false;
boolean callError = false;
Deque<T> dq = deque;
synchronized (dq) {
if (dq.size() == bufferSize) {
switch (strategy) {
case DROP_LATEST:
dq.pollLast();
dq.offer(t);
callOnOverflow = true;
break;
case DROP_OLDEST:
dq.poll();
dq.offer(t);
callOnOverflow = true;
break;
default:
// signal error
callError = true;
break;
}
} else {
dq.offer(t);
}
}
if (callOnOverflow) {
if (onOverflow != null) {
try {
onOverflow.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
} else if (callError) {
s.cancel();
onError(new MissingBackpressureException());
} else {
drain();
}
}
...
}
在这段源码中,根据不同的背压策略进行了不同的处理措施,当然这只是列举了一段关于buffer背压策略的例子。
根源
产生背压问题的根源就是上游发送速度与下游的处理速度不均导致的,所以如果想要解决这个问题就需要通过匹配两个速率达到解决这个背压根源的措施。
通常有两个策略可供使用:
- 从数量上解决,对数据进行采样
- 从速度上解决,降低发送事件的速率
- 利用flowable和subscriber
使用Flowable
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);
我们注意到这次和Observable
有些不同. 首先是创建Flowable
的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR
这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException
. 其余的策略后面再来讲解.
另外的一个区别是在下游的onSubscribe
方法中传给我们的不再是Disposable
了, 而是Subscription
, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()
方法可以切断水管, 同样的调用Subscription.cancel()
也可以切断水管, 不同的地方在于Subscription
增加了一个void request(long n)
方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
s.request(Long.MAX_VALUE);
这是因为Flowable
在设计的时候采用了一种新的思路也就是响应式拉取
的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量
和控制速度
不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子
, 我们把上游
看成小日本
, 把下游
当作叶问
, 当调用Subscription.request(1)
时, 叶问
就说我要打一个!
然后小日本
就拿出一个鬼子
给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10)
, 叶问就又说我要打十个!
然后小日本又派出十个鬼子
给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...
所以我们把request当做是一种能力, 当成下游处理事件
的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !
同步情况
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "" + integer);
}
});
当上下游工作在同一个线程
中时, 这时候是一个同步
的订阅关系, 也就是说上游
每发送一个事件必须
等到下游
接收处理完了以后才能接着发送下一个事件.
同步与异步的区别就在于有没有缓存发送事件的缓冲区。
异步情况
通过subscribeOn和observeOn来确定对应的线程,达到异步的效果,异步时会有一个对应的缓存区来换从从上游发送的事件。
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
背压策略:
- error, 缓冲区大概在128
- buffer, 缓冲区在1000左右
- drop, 把存不下的事件丢弃
- latest, 只保留最新的
- missing, 缺省设置,不做任何操作
上游从哪里得知下游的处理能力呢?我们来看看上游最重要的部分,肯定就是FlowableEmitter
了啊,我们就是通过它来发送事件的啊,来看看它的源码吧(别紧张,它的代码灰常简单):
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
FlowableEmitter是个接口,继承Emitter,Emitter里面就是我们的onNext(),onComplete()和onError()三个方法。我们看到FlowableEmitter中有这么一个方法:
long requested();
img
同步request.png
这张图的意思就是当上下游在同一个线程中的时候,在下游
调用request(n)就会直接改变上游
中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,继续发送事件便会抛异常了。
异步request.png
可以看到,当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。
Rxjava实例开发应用
- 网络请求处理(轮询,嵌套,出错重连)
- 功能防抖
- 从多级缓存获取数据
- 合并数据源
- 联合判断
- 与 Retrofit,RxBinding,EventBus结合使用
Rxjava原理
- Scheduler线程切换工作原理
- 数据的发送与接收(观察者模式)
- lift的工作原理
- map的工作原理
- flatMap的工作原理
- merge的工作原理
- concat的工作原理
网友评论