笔者在上一篇博客介绍了 RxJava 的基本使用和它的线程控制,这篇博客介绍RxJava一个很核心、很牛逼的功能-操作符。
RxJava的操作符有很多,主要分为以下几大类:创建操作符、变换操作符、过滤操作符、组合操作符、错误处理符、辅助操作符、条件和布尔操作符等等。每一种类型操作符下又有很多个具体的操作符,笔者在这篇博客介绍其中几个操作符的使用,如果还想学习其他操作符的使用,文章末尾有传送门哦
上一篇博客的实战部分,用到了几个操作符,比如create、just、from,它们都是创建操作符,用于创建Observable的操作符。它比较简单,在本篇博客不再介绍。
变换操作符
RxJava一个很牛逼的地方是它提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
变化操作符有:FlatMap、Map、Buffer、Scan、GroupBy、Window
变换操作符主要介绍一下两个:
- Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
- FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
概念过于抽象,先看个图吧
map操作符.png结合这张图,map可以简单的理解为通过映射关系把一个对象转换成另外一个对象。比如注册的时候,把用户这个对象(包含用户名和密码)。作为参数通过网络请求进行注册,对返回的json数据解析完后又是一个结果对象,通过map可以将请求对象转换成结果对象。是不是很神奇且牛批。
下面用一个注册用户的栗子先介绍Map的使用
(1)请求接口
public interface HttpRequest {
@GET("zhuce/rxjava.php/")
Call<HttpResult> getRegisteResult(@Query("name") String name, @Query("pass") String pass);
}
(2)请求对象类
public class UserInfo {
private String name;
private String pass;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
}
(3)结果对象类
public class HttpResult {
private String result;
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
(4)map操作符的使用
public void mapOperation(View view) {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
final HttpRequest httpRequest = retrofit.create(HttpRequest.class);
Observable.just(getUserInfo())
.map(new Function<UserInfo, HttpResult>() {
@Override
public HttpResult apply(UserInfo userInfo) throws Exception {
HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName()
,userInfo.getPass()).execute().body();
return httpResult;
}
}).subscribeOn(Schedulers.io())
.subscribe(new Consumer<HttpResult>() {
@Override
public void accept(HttpResult httpResult) throws Exception {
//Log.d(TAG, "accept: " + httpResult.getResult());
mText.setText(httpResult.getResult());
}
});
}
先使用just这个创建符创建一个Observable,并获取用户的信息。之后使用map转换操作符
map(new Function<UserInfo, HttpResult>(){...});
map里需要一个Function对象,它有两个泛型参数,前者是转换前的对象、后者是转换后的对象
这样在回调的apply方法里就可以实现网络请求操作
HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName()
,userInfo.getPass()).execute().body();
不要忘记切换到io线程中操作
.subscribeOn(Schedulers.io())
最后就是订阅了,需要创建一个观察者消费该事件,进行ui的更新
.subscribe(new Consumer<HttpResult>() {
@Override
public void accept(HttpResult httpResult) throws Exception {
mText.setText(httpResult.getResult());
}
});
}
FlatMap变换操作符
FlatMap 和 map 有一个相同点:它也是把传入的参数转化之后返回另一个对象。
不同的是, flatMap返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中
FlatMap.pngflatMap过程原理如下
- 使用传入的事件对象创建一个 Observable 对象
- 并不发送这个 Observable, 而是将它激活,于是它开始发送事件
- 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象铺平(flat)之后通过统一路径分发了下去。
改写之前注册的例子
Observable.just(getUserInfo()).flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() {
@Override
public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception {
HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName(),
userInfo.getPass()).execute().body();
return Observable.just(httpResult);
}
}).subscribeOn(Schedulers.io())
.subscribe(new Consumer<HttpResult>() {
@Override
public void accept(HttpResult httpResult) throws Exception {
mText.setText(httpResult.getResult());
}
});
可以看到基本没有什么改动,最明显的不同就是在返回的结果对象中
flatMap(new Function<UserInfo, ObservableSource<HttpResult>>()
不直接是 HTTPResult 作为参数 而是一个ObservableSource。这样看并没有比map有优势啊,
但是如果有这样一个需求,注册完之后自动登录。用map就不能直接将这两个事件汇在一起最后接收处理。
这个时候就可以用在使用一个flatMap,它对注册的结果对象作为参数进行登录请求,再转换成登录结果对象。
这样两个事件被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
Observable.just(getUserInfo())
.flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() {
@Override
public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception {
HttpResult httpResult = httpRequest.getRegisteResult(userInfo.getName(),
userInfo.getPass()).execute().body();
return Observable.just(httpResult);
}
})
.flatMap(new Function<HttpResult, ObservableSource<LoginResult>>() {
@Override
public ObservableSource<LoginResult> apply(HttpResult httpResult) throws Exception {
if(httpResult.getResult().equals("used")){
UserInfo userInfo = getUserInfo();
LoginResult loginResult = httpRequest.getLoginResult(userInfo.getName(),
userInfo.getPass()).execute().body();
return Observable.just(loginResult);
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<LoginResult>() {
@Override
public void accept(LoginResult loginResult) throws Exception {
mText.setText(loginResult.getResult());
}
});
这样就可以实现注册完直接登录的需求。把注册和登录这两个事件平铺完之后统一发放下去
我们创建Retrofit对象的时候添加了和Rxjava的适配,有什么作用呢?
addCallAdapterFactory(RxJavaCallAdapterFactory.create())
它能让Retrofit和Rxjava能够无缝适配,让我们更专注于事件的走向。比如之前的代码可以变的更简洁
(1)把请求接口方法返回值改为Observable
@GET("zhuce/rxjava.php/")
Observable<HttpResult> getRegisteResult(@Query("name") String name, @Query("pass") String pass);
(2)只需要进行请求,它会直接返回一个Observable对象,不需要自己去创建该对象并返回,不要太方便
Observable.just(getUserInfo())
.flatMap(new Function<UserInfo, ObservableSource<HttpResult>>() {
@Override
public ObservableSource<HttpResult> apply(UserInfo userInfo) throws Exception {
return httpRequest.getRegisteResult(userInfo.getName(),userInfo.getPass());
}
})
//....
(3)运行后抛出异常
Unable to create call adapter for io.reactivex.Observable<com.example.rxjavademo.bean.HttpResult>
adapter-rxjava目前还不支持Rxjava 2.x。但是jakewharton大神自己写了一个库让Retrofit来支持Rxjava 2.x
(4)引入库
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
将设配工厂代码改为
addCallAdapterFactory(RxJava2CallAdapterFactory.create())
再次运行不再抛异常
过滤操作符
过滤操作符的目的是在一系列的Observable中过滤一些事件后再发出。
过滤操作符有:Debounce、Filter、Distinct、First、Take.....
比如经常看见的一个需求:在文本框输入关键字,对关键字进行过滤搜索(可以联想淘宝搜索框)。
普通的做法是,通过一个文本框监听文本内容的变化后进行网络请求,再展示返回的数据
mEdSearch.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
//文本变化 --请求网络---更新结果
}
@Override
public void afterTextChanged(Editable s) {
}
});
这会有两个问题:
- 往往我们需要的是一定量的关键字再过滤,而不是文本内容一变化就搜索,这会造成很多流量的浪费
- 有可能因为网络的波动,会引起第一次搜索结果晚于第二次搜索结果到达,结果不可控。
通过过滤操作符可以解决以上两个问题:
(1)引入RxBinding库
这个实例需要引入RxBinding库
implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha2'
(2)使用过滤操作符的debounce
过滤操作符debounce.png过滤操作符debounce 它会在定义的时间内监测最后的变化结果。
比如下面的例子表示它会获取在监测到文本变化后 2000ms 内的最后一次结果
RxTextView.textChanges(mEdSearch).debounce(2000,TimeUnit.MILLISECONDS)
(3)使用过滤操作符filter
过滤操作符filter.png过滤操作符filter,它会根据你的过滤规则返回一个布尔值,如果为false就不会将事件传递下去
比如下面的例子表示只有当输入的文本内容不为空,才可以继续之后的事件
.filter(new Predicate<CharSequence>() {
@Override
public boolean test(CharSequence charSequence) throws Exception {
return charSequence.toString().length()>0;
}
})
(4)获取网络请求过滤的结果
switchMap(new Function<CharSequence, ObservableSource<List<String>>>() {
@Override
public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception {
//模拟网络请求过滤的结果
List<String> list = new ArrayList<>();
list.add("abc");
list.add("abcc");
return Observable.just(list);
}
})
(5)把事件结果传递给 Subscribe
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> stringList) throws Exception {
StringBuilder str = new StringBuilder();
for(String s : stringList){
str.append(s+"\n");
}
mText.setText(str.toString());
}
});
完整代码如下
RxTextView.textChanges(mEdSearch).debounce(2000,TimeUnit.MILLISECONDS)
.subscribeOn(AndroidSchedulers.mainThread()) //UI操作要在主线程
.filter(new Predicate<CharSequence>() {
@Override
public boolean test(CharSequence charSequence) throws Exception {
return charSequence.toString().length()>0;
}
})
.subscribeOn(Schedulers.io()) //swtichmap里的网络请求放在io线程
.switchMap(new Function<CharSequence, ObservableSource<List<String>>>() {
@Override
public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception {
//模拟网络请求过滤的结果
List<String> list = new ArrayList<>();
list.add("abc");
list.add("abcc");
return Observable.just(list);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> stringList) throws Exception {
StringBuilder str = new StringBuilder();
for(String s : stringList){
str.append(s+"\n");
}
mText.setText(str.toString());
}
});
总结
本篇博客笔者介绍了变化操作符和过滤操作符中的几个,想学习其他的用法,可以传送到Rxjava的中文文档
英文好的可以传送到官方文档
由于操作符太多,写着写着发现才写了两类操作符的几个,文章就有点过长了。还是需要再来一篇博客写剩下的操作符,可能还需要两篇[啊哈哈]
网友评论