1.Map操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {//Integer转换为String
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
上游我们发送的是数字类型,下游接受的是String类型,中间起转换作用的就是map操作符(上游发来的事件转换为任意类型)。
运行结果:
D/TAG: This is result 1
D/TAG: This is result 2
D/TAG: This is result 3
2.FlatMap
FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里,注:concatMap
和FlatMap
一样的作用,只是contactMap
是有序的,FlatMap
是无序的
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
打印如下:
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
3.concatWith 链接操作符
Flowable.just(3, 2, 4).concatWith(Flowable.just(100, 200))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("concatWith, value:" + integer);
}
});
打印如下:
concatWith, value:3
concatWith, value:2
concatWith, value:4
concatWith, value:100
concatWith, value:200
4.filter 过滤操作符
![](https://img.haomeiwen.com/i15431408/bfc741bf07f41d0e.png)
会过滤掉值小于10的项
Flowable.just("item2", "item1", "item7", "item8", "item9").filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.equals("item1") || s.equals("item7");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("s = " + s);
}
});
}
打印如下:
s = item1
s = item7
5.distinct 去重操作
发射的数据与之前发射的数据相比,如果相同就丢掉。
![](https://img.haomeiwen.com/i15431408/cfa7bc98b522e097.png)
Flowable.just("item1", "item2", "item2", "item3").distinct().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("s:" + s);
}
});
输出结果:
s:item1
s:item2
s:item3
6.distinctUntilChanged
区分每一个被发出项目与前一个紧邻的项不同,如果相同,就不会发射,而distinct会丢弃所有相同的重复项,不管是不是紧邻着发射的。
Flowable.just("item2", "item1", "item2", "item2", "item3").distinctUntilChanged().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("s:" + s);
}
});
打印如下:
s:item2
s:item1
s:item2
s:item3
7.mergeWith合并操作符
将两个观察源合并成一个
var flowable= Observable.just(3.toLong()).delay(2, TimeUnit.SECONDS)
Observable.intervalRange(4, 6, 1, 1, TimeUnit.SECONDS).mergeWith(flowable).subscribe{
Log.e(TAG, "merge:"+it.toString())
}
打印如下:
merge:4
merge:5
merge:3
merge:6
merge:7
merge:8
merge:9
注:concat与merge区别:
1.merge 将全部订阅 Observable,但是谁先完成谁先通知,如果大家完成时间一样,按顺序调用
2.concat会在第一个Observable调用onComplete后才能订阅下一个 Observable,(顺序调用)
- 对于contact,有一点务必要注意,那就是上一个数据源不可用,请务必subscriber.onCompleted(); 这样才能继续走下一个数据源。
8. debounce 去抖操作符
当调用函数N秒后,才会执行函数中动作,若在这N秒内又重复调用该函数则将取消前一次调用,并重新计算执行时间。
这个debounce在js常被使用,比如界面根据用户输入做ajax请求局部刷新页面,那么势必会重复请求接口,而实际可能在n秒内用户并没有完成输入,那么频繁调函数肯定是不准确的,那么可以设定一个debounce time,在n秒内的调用都是无效的。
Observable.just(1,2,3,4).debounce(1,TimeUnit.MILLISECONDS).
subscribe{
Log.e(TAG,it.toString())
}
打印如下:
4
使用场景
现在几乎所有的 App 都有搜索功能 , 一般情况我们监听 EditText 控件,当值发生改变去请求搜索接口. 如:
etKey.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
String key = etKey.getText().toString().trim();
if (key.length() > 0){
search(key); // 请求搜索接口,成功后把结果显示到界面上.
}
}
});
这样做有两个问题:
可能导致很多没有意义的请求,耗费用户流量(因为控件的值每更改一次立即就会去请求网络,而且只是最后输入的关键字是有用的)
可能导致最终搜索的结果不是用户想要的. 例如,用户一开始输入关键字 AB 这个时候出现两个请求, 一个请求是 A 关键字, 一个请求是 AB 关键字. 表面上是 A 请求先发出去, AB 请求后发出去. 如果后发出去的 AB 请求先返回, A 请求后返回,那么 A 请求后的结果将会覆盖 AB 请求的结果. 从而导致搜索结果不正确.
如何解决问题?
subscription = RxTextView.textChanges(etKey)
.debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
//对用户输入的关键字进行过滤
.filter(new Func1<CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence charSequence) {
Log.d("RxJava", "filter is main thread : " + (Looper.getMainLooper() == Looper.myLooper()));
return charSequence.toString().trim().length() > 0;
}
})
.flatMap(new Func1<CharSequence, Observable<List<String>>>() {
@Override
public Observable<List<String>> call(CharSequence charSequence) {
Log.d("RxJava", getMainText("flatMap"));
return searchApi.search(charSequence.toString());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<String>>() {
@Override
public void call(List<String> strings) {
tvContent.setText("search result:\n\n");
tvContent.append(strings.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
tvContent.append("Error:" + throwable.getMessage());
}
});
上面代码的主要逻辑:
使用 debounce 操作符设置: 只有当用户输入关键字后 400 毫秒才发射数据(说的直白点就是 400 毫秒后才会走后面的逻辑)
使用 filter 操作符 对用户输入的关键字进行过滤:只有输入的关键字不为空,才会走后面的逻辑;
使用 flatMap 操作符:使用最终的关键字去请求搜索接口
至此,避免 EditText 每改变一次就请求一次的情况。
但是,还有一个问题,上面说的导致搜索结果的错乱,上面的代码还是没有解决,比如停止输入 400 毫秒后, 那么肯定会开始请求 Search 接口, 但是用户又会输入新的关键字,这个时候上个请求还没有返回, 新的请求又去请求 Search 接口.这个时候有可能最后的一个请求返回, 第一个请求最后返回,导致最终显示的结果是第一次搜索的结果.
怎么去解决这个问题:可以使用 switchMap
操作符解决。
switchMap
操作符和 flatMap
操作符差不多,区别是 switchMap
操作符只会发射(emit)最近的 Observables。
也就是说,当 400 毫秒后,发出第一个搜索请求,当这个请求的过程中,用户又去搜索了,发出第二个请求,不管怎样,switchMap 操作符只会发射第二次请求的 Observable。所以,在上面的代码基础上把 flatMap 改成 switchMap
就可以了。
9.take()
控制观察者接收的事件的数量
Observable.interval(0,1,TimeUnit.SECONDS)
.take(60)
.subscribe(object : Observer<Long> {
override fun onNext(integer: Long) {
Log.e(TAG, "==================onNext $integer")
}
override fun onComplete() {
Log.e(TAG, "==================onComplete ")
}
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "==================onSubscribe ")
}
override fun onError(e: Throwable) {
Log.e(TAG, "==================onSubscribe ")
}
})
如上代码可以实现一个短信验证码倒计时的功能。
10.takeUntil
可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。
Observable.just(1,2,3,4,5,6).takeUntil {
it<3
}.subscribe { Log.e(TAG,"takeUntil:${it}") }
打印结果:
takeUntil:1
11.concat 组合操作符
讲多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat()最多只可以发送4个事件。
Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印如下:
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
12.retry
如果出现错误事件,则会重新发送所有事件序列。times是代表重新发的次数。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印如下:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
13.retryWhen()
当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送Error事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
Observable.create(new ObservableOnSubscribe < String > () {
@Override
public void subscribe(ObservableEmitter < String > e) throws Exception {
e.onNext("chan");
e.onNext("ze");
e.onNext("de");
e.onError(new Exception("404"));
e.onNext("haha");
}
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Throwable throwable) throws Exception {
if(!throwable.toString().equals("java.lang.Exception: 404")) {
return Observable.just("可以忽略的异常");
} else {
return Observable.error(new Throwable("终止啦"));
}
}
});
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "==================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印如下:
==================onSubscribe
==================onNext chan
==================onNext ze
==================onNext de
==================onError java.lang.Throwable: 终止啦
将onError(new Exception("404")) 改为onError(new Exception("303")) 看看打印结果:
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
14.concatDelayError()
描述:在使用concat()时,如果其中一个观察者发出了error事件,则其他所有的观察者事件都会停止。而concatDelayError()就是为了解决这一问题,使用此操作符,即使其中一个观察者发出了error事件,其他事件会继续发送,直到所有事件发送完毕,才出发onError。
Observable.concatArrayDelayError(
//从1开始,共发送3个,延时1s发送第一个,之后间隔1s发送余下数字
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException());
emitter.onComplete();
}
}),
//从11开始,共发送3个,延时1s发送第一个,之后间隔1s发送余下数字
Observable.just(3, 4, 5))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG,"onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"onError");
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete");
}
});
15.zip()
会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 发送的事件 " + s1);
return s1;
}}),
Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:
05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe
05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
===================onNext A1B1
05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
===================onNext A2B2
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
===================onComplete
可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。
网友评论