all
/**
* 判断发送的每项数据是否都满足 设置的函数条件
*/
private void useAll() {
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer <= 10;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
KLog.d(TTAG, "结果是" + aBoolean);
}
});
}
TakeWhile
// 1. 每1s发送1个数据 = 从0开始,递增1,即0、1、2、3
Observable.interval(1, TimeUnit.SECONDS)
// 2. 通过takeWhile传入一个判断条件
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(Long integer) throws Exception {
return (integer < 3);
// 当发送的数据满足<3时,才发送Observable的数据
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
KLog.d(TTAG, "发送了事件 " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
skipWhile
// 1. 每隔1s发送1个数据 = 从0开始,每次递增1
Observable.interval(1, TimeUnit.SECONDS)
// 2. 通过skipWhile()设置判断条件
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return (aLong < 5);
// 直到判断条件不成立 = false = 发射的数据≥5,才开始发送数据
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
KLog.d(TTAG, "发送了事件 " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
sequenceEqual
Observable.sequenceEqual(
Observable.just(4, 5, 6),
Observable.just(4, 5, 6)
)
.subscribe(aBoolean -> {
KLog.d(TTAG, "2个Observable是否相同:" + aBoolean);
// 输出返回结果
});
contains
/**
* 判断发送的数据中是否包含指定数据
* <p>
* 若包含,返回 true;否则,返回 false
* 内部实现 = exists()
*/
private void contains() {
Observable.just(1, 2, 3, 4, 5, 6)
.contains(4)
.subscribe(aBoolean -> {
KLog.d(TTAG, "result is " + aBoolean);
// 输出返回结果
});
}
网友评论