RxJava2 实战系列文章
RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用
一、示例
1.1 应用场景
几乎每个应用程序都提供了搜索功能,某些应用还提供了搜索联想。对于一个搜索联想功能,最基本的实现流程为:客户端通过EditText
的addTextChangedListener
方法监听输入框的变化,当输入框发生变化之后就会回调afterTextChanged
方法,客户端利用当前输入框内的文字向服务器发起请求,服务器返回与该搜索文字关联的结果给客户端进行展示。
在该场景下,有几个可以优化的方面:
- 在用户连续输入的情况下,可能会发起某些不必要的请求。例如用户输入了
abc
,那么按照上面的实现,客户端就会发起a
、ab
、abc
三个请求。 - 当搜索词为空时,不应该发起请求。
- 如果用户依次输入了
ab
和abc
,那么首先会发起关键词为ab
请求,之后再发起abc
的请求,但是abc
的请求如果先于ab
的请求返回,那么就会造成用户期望搜索的结果为abc
,最终展现的结果却是和ab
关联的。
1.2 示例代码
这里,我们针对上面提到的三个问题,使用RxJava2
提供的三个操作符进行了优化:
- 使用
debounce
操作符,当输入框发生变化时,不会立刻将事件发送给下游,而是等待200ms
,如果在这段事件内,输入框没有发生变化,那么才发送该事件;反之,则在收到新的关键词后,继续等待200ms
。 - 使用
filter
操作符,只有关键词的长度大于0
时才发送事件给下游。 - 使用
switchMap
操作符,这样当发起了abc
的请求之后,即使ab
的结果返回了,也不会发送给下游,从而避免了出现前面介绍的搜索词和联想结果不匹配的问题。
public class SearchActivity extends AppCompatActivity {
private EditText mEtSearch;
private TextView mTvSearch;
private PublishSubject<String> mPublishSubject;
private DisposableObserver<String> mDisposableObserver;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_search);
mEtSearch = (EditText) findViewById(R.id.et_search);
mTvSearch = (TextView) findViewById(R.id.tv_search_result);
mEtSearch.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
startSearch(s.toString());
}
});
mPublishSubject = PublishSubject.create();
mDisposableObserver = new DisposableObserver<String>() {
@Override
public void onNext(String s) {
mTvSearch.setText(s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.debounce(200, TimeUnit.MILLISECONDS).filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.length() > 0;
}
}).switchMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String query) throws Exception {
return getSearchObservable(query);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(mDisposableObserver);
mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(mDisposableObserver);
}
private void startSearch(String query) {
mPublishSubject.onNext(query);
}
private Observable<String> getSearchObservable(final String query) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.d("SearchActivity", "开始请求,关键词为:" + query);
try {
Thread.sleep(100 + (long) (Math.random() * 500));
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
Log.d("SearchActivity", "结束请求,关键词为:" + query);
observableEmitter.onNext("完成搜索,关键词为:" + query);
observableEmitter.onComplete();
}
}).subscribeOn(Schedulers.io());
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}
运行结果为:
二、示例解析
下面,我们就来详细的介绍一下这个例子中应用到的三种操作符
2.1 debounce
debounce
的原理图如下所示:
debounce
原理类似于我们在收到请求之后,发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息,以此类推。
而如果在这段时间内,上游发送了onComplete
消息,那么即使没有到达需要等待的时间,下游也会立刻收到该消息。
2.2 filter
filter
的原理图如下所示:
filter
的原理很简单,就是传入一个Predicate
函数,其参数为上游发送的事件,只有该函数返回true
时,才会将事件发送给下游,否则就丢弃该事件。
2.3 switchMap
switchMap
的原理是将上游的事件转换成一个或多个新的Observable
,但是有一点很重要,就是如果在该节点收到一个新的事件之后,那么如果之前收到的时间所产生的Observable
还没有发送事件给下游,那么下游就再也不会收到它发送的事件了。
如上图所示,该节点先后收到了红、绿、蓝三个事件,并将它们映射成为红一、红二、绿一、绿二、蓝一、蓝二,但是当蓝一发送完事件时,绿二依旧没有发送事件,而最初绿色事件在蓝色事件之前,那么绿二就不会发送给下游。
更多文章,欢迎访问我的 Android 知识梳理系列:
- Android 知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
- 个人主页:http://lizejun.cn
- 个人知识总结目录:http://lizejun.cn/categories/
网友评论
PublishSubject<String> objectPublishSubject = PublishSubject.create();
objectPublishSubject
.debounce(200, TimeUnit.MILLISECONDS,AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.length() > 0;
}
})
.switchMap(new Function<String, ObservableSource<AllJingJianBean>>() {
@Override
public ObservableSource<AllJingJianBean> apply(String s) throws Exception {
//封装请求实体类
GetJingJianRequestBean getJingJianRequestBean = new GetJingJianRequestBean();
getJingJianRequestBean.setDateParam(dateParam);
getJingJianRequestBean.setPage(String.valueOf(page));
getJingJianRequestBean.setSize(String.valueOf(size));
if (query != null) {
getJingJianRequestBean.setQuery(query);
}
//请求
Observable<AllJingJianBean> observable = createApiServiceWithTokenHeader().getTodayJingJian(ApiConfig.GET_JING_JIAN, getJingJianRequestBean).map(new HttpFunction<AllJingJianBean>()).subscribeOn(Schedulers.io());
return observable;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
LogUtils.e("开始搜索 - "+query);
objectPublishSubject.onNext(query);
mCompositeDisposable.add(mCompositeDisposable);
主席,为什么add自己啊
2、 mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(mCompositeDisposable);
3、这几行看不懂~~