所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列
要想理解RxJava的各种操作符的话,结合着弹子图会比较好。

1.map( )
- 对Observable发射的每一项数据应用一个函数,执行变换操作
- map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。RxJava将这个操作符实现为map函数。
- 这个操作符默认不在任何特定的调度器上执行。

我们来看一个 map()的例子:
private void map01() {
Observable.just("a", "b", "c")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
if (s.equals("a")) {
return 1;
} else if (s.equals("b")) {
return 2;
} else if (s.equals("c")) {
return 3;
} else {
return 0;
}
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("Magic", integer + "");
}
});
}
map()方法将参数中的 String对象转换成一个Integer对象后返回,而在经过 map()
方法后,事件的参数类型也由 String转为了 Integer。
2.flatmap( )
我们先来看两段代码
//通过搜索的内容,返回网址集合
public Observable<List<String>> queryUrls(String text) {
List<String> urlList;
if (text.contains("news")) {
urlList = Arrays.asList("www.baidu.com", "www.sina.com.cn", "www.qq.com");
} else if (text.contains("buy")) {
urlList = Arrays.asList("www.taobao.com", "www.jd.com");
} else {
urlList = Arrays.asList("www.google.com", "www.youtube.com");
}
return Observable.just(urlList);
}
private void flatmap01() {
queryUrls("")
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String url) {
Log.e("Magic", url);
}
});
}
从上面的代码可以看出, flatMap()和 map()有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map()不同的是, flatMap()中返回的是个Observable对象,并且这个Observable对象并不是被直接发送到了Subscriber的回调方法中。
flatMap()的原理是这样的
- 使用传入的事件对象创建一个 Observable对象;
- 并不发送这个Observable, 而是将它激活,于是它开始发送事件;
- 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,而这个Observable负责将这些事件统一交给Subscriber的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat。
我们来看下面一段代码,用flatmap来模拟一次嵌套的网络请求
//通过网址,返回网站名称
public Observable<String> getTitle(String url) {
if (url.contains("baidu")) {
return Observable.just("百度");
} else if (url.contains("sina")) {
return Observable.just("新浪");
} else if (url.contains("qq")) {
return Observable.just("腾讯");
} else if (url.contains("taobao")) {
return Observable.just("淘宝");
} else if (url.contains("jd")) {
return Observable.just("京东");
} else if (url.contains("google")) {
return Observable.just("谷歌");
} else if (url.contains("youtube")) {
return Observable.just("油管");
} else {
return Observable.just("404");
}
}
//嵌套flatMap
private void flatmap02() {
queryUrls("news")
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String url) {
return getTitle(url);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String title) {
Log.e("Magic", title);
}
});
}
最后我们再来看一下flatmap的弹子图

3.concatMap( )
- .concatMap()与flatMap()这两个方法似乎相差无几,但有一点不同:用操作符合并最终结果的时候。
- flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable
发送时的顺序。而concatMap()则反之。 - flatMap()使用merge()操作符,而concatMap()使用concat()操作符,这就意味着后者遵循元素的顺序,所以,请留意是否需要保持元素次序。

private void concatMap01() {
ArrayList<Integer> numbers = new ArrayList<>(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9));
Observable.from(numbers)
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * integer)
.subscribeOn(Schedulers.newThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("Magic", integer.toString() +"");
}
});
}
4.flatMapIterable
flatMapIterable和flatMap类似,区别是 flatMap参数把每个数据转换为一个新的 Observable,而flatMapIterable参数把一个数据转换为一个新的iterable对象。
private void flatMapIterable() {
Observable.range(1, 4)
.flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Integer integer) {
List<Integer> list = new ArrayList<>();
for (int i = 1; i < 1 + integer; i++) {
list.add(i);
}
return list;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("Magic", integer.toString() + "");
}
});
}
5.switchMap( )
switch()和flatMap()很像,除了一点,当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项。
private void switchMap() {
Observable.just("A", "B", "C", "D", "E")
.switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s)
.subscribeOn(Schedulers.newThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e("Magic", "------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "------>onError()" + e);
}
@Override
public void onNext(String s) {
Log.e("Magic", "------>onNext:" + s);
}
});
}

6.scan( )
连续地对数据序列的每一项应用一个函数,然后连续发射结果。
Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator。

private void scan() {
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
Log.e("Magic", "Next: " + item);
}
@Override
public void onError(Throwable error) {
Log.e("Magic", "Error: " + error.getMessage());
}
@Override
public void onCompleted() {
Log.e("Magic", "Sequence complete.");
}
});
}
6.groupBy( )
- 将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列;
- GroupBy操作符将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射;
- RxJava实现了groupBy操作符。它返回Observable的一个特殊子类;GroupedObservable,实现了GroupedObservable接口的对象有一个额外的方法getKey,这个Key用于将数据分组到指定的Observable。

private void groupBy() {
Observable.range(0, 10)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
//分成0,1两个个小组
return integer % 2;
}
})
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {
Log.e("Magic", "------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "------>onError()" + e);
}
@Override
public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
integerIntegerGroupedObservable
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e("Magic", "------>inner onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "------>inner onError()" + e);
}
@Override
public void onNext(Integer integer) {
Log.e("Magic", "------>group:" + integerIntegerGroupedObservable.getKey() + " value:" + integer);
}
});
}
});
}
7.buffer()
- 定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值;
- 能一次性集齐多个结果到列表中,订阅后自动清空相应结果,直到完全清除;
-
也可以周期性的集齐多个结果到列表中,订阅后自动清空相应结果,直到完全清除。
buffer()
//一次订阅2个
private void buffer01() {
Observable.range(1, 5)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
Log.e("Magic", "-----------------onCompleted:");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "----------------->onError:");
}
@Override
public void onNext(List<Integer> strings) {
Log.e("Magic", "----------------->onNext:" + strings);
}
});
}
//周期性订阅多个结果
private void buffer05() {
Observable
.interval(1, TimeUnit.SECONDS)
.buffer(3, TimeUnit.SECONDS)
.subscribe(new Observer<List<Long>>() {
@Override
public void onCompleted() {
Log.e("Magic", "-----------------onCompleted:");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "----------------->onError:");
}
@Override
public void onNext(List<Long> longs) {
Log.e("Magic", "----------------->onNext:" + longs);
}
});
}
8.window()
- 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据;
- Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个集,最后发射一个onCompleted通知。

private void window() {
Observable
.interval(1, TimeUnit.SECONDS)
.take(10)
.window(3, TimeUnit.SECONDS)
.subscribe(new Observer<Observable<Long>>() {
@Override
public void onCompleted() {
Log.e("Magic", "------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "------>onError()" + e);
}
@Override
public void onNext(Observable<Long> longObservable) {
Log.e("Magic", "------->onNext()");
longObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("Magic", "------>call():" + aLong);
}
});
}
});
}
9.cast()
- cast操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map的一个特殊版本;
-
可以用于校验是否是同一种类型,不够智能,如String->Integer 会出现转换失败异常。
cast()
private void cast() {
Observable
.just(1, 2, 3, 4, 5)
.cast(Integer.class)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("Magic", "------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e("Magic", "------>onError()" + e);
}
@Override
public void onNext(Integer integer) {
Log.e("Magic", "------>onNext()" + integer);
}
});
}
网友评论