上一篇:RxJava 源码学习之创建操作符
本篇将通过源码来学习下RxJava 的变换操作符,期待与大家一起探讨学习。
Buffer
-
作用分析
Buffer 可义简单地理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个。
-
代码示例(一)
buffer(count)
以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据(最后发射的列表数据可能少于count项)
//测是代码:
Integer[] nums = {1,2,3,4,5};
Observable.from(nums).buffer(2)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onNext(List<Integer> integers) {
System.out.print("integers = ");
for (Integer i: integers) {
System.out.print(i +" ");
}
System.out.println();
}
});
###################################################
输出结果:
integers = 1 2
integers = 3 4
integers = 5
Sequence complete.
-
代码示例(二)
buffer(count, skip)
从原始Observable的第一项数据开始创建新的缓存,此后每当收到skip
项数据,用count项数据填充缓存:开头的一项和后续的count-1
项,它以列表(List)的形式发射缓存,取决于count和skip的值,这些缓存可能会有重叠部分(比如skip < count时),也可能会有间隙(比如skip > count时)。
Integer[] nums = {1,2,3,4,5,6,7,8,9};
Observable.from(nums).buffer(2,3)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onNext(List<Integer> integers) {
System.out.print("integers = ");
for (Integer i: integers) {
System.out.print(i +" ");
}
System.out.println();
}
});
###################################################
输出结果:
integers = 1 2
integers = 4 5
integers = 7 8
Sequence complete.
Map
-
作用分析
Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
-
代码示例
//将Integer 转成 String 输出
Integer[] nums = {1,2,3,4};
Observable.from(nums).map(new Func1<Integer, String>() {
@Override
public String call(Integer i) { return "this is i="+i; }})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext = " + s); }});
###################################
输出结果:
onNext = this is i=1
onNext = this is i=2
onNext = this is i=3
onNext = this is i=4
Sequence complete.
FlatMap
-
作用分析
FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。如果想按顺序输出,可以用 concatMap
。
比较:RxJava Observable transformation: concatMap() vs. flatMap()
注意:如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。
-
代码示例
//测是代码
Integer[] nums = {1,2,3,4};
Observable.from(nums).flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just("this is i="+integer.intValue());
}}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext = " + s); }});
########################################
输出结果:
onNext = this is i=1
onNext = this is i=2
onNext = this is i=3
onNext = this is i=4
Sequence complete.
Window
-
作用分析
Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。
Paste_Image.pngScan
-
作用分析
Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
Paste_Image.png-
代码示例
//测是代码
/**
* 累加操作
**/
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onCompleted() { System.out.println("Sequence complete."); }});
################################################
输出结果:
Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Sequence complete.
参考文档:
结束语
RxJava之变换操作符到此就学完啦,下一篇就开始学习RxJava之过滤操作符吧 :P
网友评论