(1) all
判断发送的每项数据是否都满足,如果都满足则返回ture,否则返回false;
Observable.just(1, 2, 3, 4, 5, 6)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return true;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
返回结果:true
(2)takeWhile
不停的发射数据,每条数据都需要判断是否满足条件,如果满足条件则继续发送,否则立即终止发射。
Observable.just(1, 2, 3, 4, 5, 6)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if(integer == 1){
return true;
}
if(integer == 2){
return false;
}
if(integer == 3){
return true;
}
if(integer == 4){
return false;
}
return true;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回结果:1
(3)skipWhile
数据发射之前会条件判断,知道判断结果=false
之后才开始发射数据,之后如果发射的数据的判断条件结果=true,则照样发射数据。
Observable.just(1, 2, 3, 4, 5, 6)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if(integer > 4){
return false;
}
if(integer == 6){
return true;
}
return true;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回结果:5 6
(4)takeUntil
不停的发射数据,如果发射的数据不满足条件,则继续发射数据,直到满足条件之后停止发射数据。
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if(integer == 4){
return true;
}
return false;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回结果:1 2 3 4
(5)skipUntil
直到skipUntil
传入的Observable
开始发送数据时才开始发送。
Observable.interval(1, TimeUnit.SECONDS)
.skipUntil(Observable.timer(5, TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
Log.d("aaa", String.valueOf(mlong));
}
});
返回结果:
图片.png(6)SequenceEqual
判断两个Observable的数据是否相同,如果相同返回true,否则返回false。
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just("1", "2", "3"))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
返回结果:false
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
返回结果:true
(7)contains
当Observable中包含指定的数据时,返回true,否则返回false。
.just(1, 2, 3)
.contains(4)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
返回结果:false
(8)isEmpty
判断发送的数据是否为空。
Observable.empty()
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
结果返回:true
Observable.just(1, 2, 3, 4, 5)
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("aaa", String.valueOf(aBoolean));
}
});
结果返回:false
(9)amb
当需要发送多个 Observable时,只发送先发送数据的Observable的数据,而其余 Observable则被丢弃。
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1, 2));
list.add(Observable.just(3, 4));
list.add(Observable.just(5, 6));
list.add(Observable.just(7, 8));
Observable
.amb(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
Observable.ambArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
以上两种写法都可以
返回结果:1 2
(10)defaultIfEmpty
在不发送任何有效事件( Next事件)、仅发送了Complete 事件的前提下,发送一个默认值
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
结果返回:10
网友评论