参考文章:https://www.jianshu.com/p/031745744bfa
1. 准备
安装依赖
compile 'io.reactivex.rxjava2:rxjava:2.0.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
2. 使用
创建一个按钮点击被观察者,且规定发射出来的sourse的类型为String
Observable.create(new ObservableOnSubscribe<String>() {})
实现订阅方法,给mainButton2按钮绑定点击事件,然后将mainEdit1输入框的文字发射出去,并且在emitter取消的时候将mainButton2的监听移除,防止内存泄露
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
mainButton2.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
emitter.onNext(mainEdit1.getText().toString());
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mainButton2.setOnClickListener(null);
}
});
}
创建观察者,且订阅按钮点击被观察者
searchTextObservable
.subscribe(new Consumer<String>() {
@Override
public void accept(String result) throws Exception {
showResult(result);
}
});
}
模拟耗时任务,睡眠2s,返回只包含输入框文字的数组
private static class SearchEngine {
ArrayList<String> strings;
private List<String> search(String txt) {
strings = new ArrayList<>();
strings.add(txt);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return strings;
}
}
运用filter操作符实现Predicate接口进行条件判定过滤,运用debounce操作符设定发射间隔
buttonClickObservable
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
return s.length() > 1;
}
})
.debounce(2, TimeUnit.SECONDS);
再创建一个输入文字改变被观察者
private Observable<String> createTextChangeObservable() {
Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
final TextWatcher watcher = new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {}
@Override
public void afterTextChanged(Editable s) {}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
emitter.onNext(s.toString());
}
};
mainEdit1.addTextChangedListener(watcher);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mainEdit1.removeTextChangedListener(watcher);
}
});
}
});
return textChangeObservable
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
return s.length() > 1;
}
})
.debounce(2, TimeUnit.SECONDS);
}
运用merge操作符将以上两个被观察者联合起来
Observable<String> searchTextObservable = createButtonClickObservable();
Observable<String> textChangeObservable = createTextChangeObservable();
Observable<String> observable = Observable.merge(searchTextObservable,textChangeObservable);
切换线程
先切换到主线程,将进度条显示出来
再切换到子线程,进行耗时的搜索工作
再切换到主线程,隐藏进度条且将搜索结果显示出来
mDisposable = observable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
showProgress();
}
})
.observeOn(Schedulers.io())
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(@NonNull String query) throws Exception {
return mSearchEngine.search(query);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> result) throws Exception {
hideProgress();
showResult(result);
}
});
在onStop中解除订阅
@Override
protected void onStop() {
super.onStop();
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
写于2018-4-4
网友评论