本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。
RxJava的组成
- 被观察者-------Observable
- 观察者-----------Observer
- 订阅---------------subscribe
//被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onNext("b");
emitter.onNext("c");
emitter.onComplete();
}
});
//观察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
LogUtils.e(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//订阅
observable.subscribe(observer);
常见操作符
创建操作符
- create()
//创建一个被观察者
Observable<String>observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello Java");
emitter.onComplete();
}
});
- just() --------发送事件不可以超过10个
Observable.just(1,2,3,4,5,6,7,8,9,0).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e(integer);
}
});
- From 操作符
- fromArray() ----- 可以发送数组(数量可以大于10个)
Integer integers[] = {0,1,2,3,4,5};
Observable.fromArray(integers).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e(integer);
}
});
- fromCallable() -----被观察者返回一个结果值给观察者
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "hello";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
- fromIterable() ---------可以发送一个List集合给观察者
List<String>list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Observable.fromIterable(list).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
- fromFuture() ------- 可以发送一个Future
FutureTask<String>futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello";
}
});
//doOnSubscribe()----- 开始订阅时才会执行
Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();//开始执行
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
- defer() -------------- 只有观察者订阅时,才会创建新的被观察者
String str_name = "张三";
public void rxjava() {
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(str_name);
}
});
str_name = "李四";
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
LogUtils.e("str_name = " + s);//王五,赵六
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
str_name = "王五";
observable.subscribe(observer);
str_name = "赵六";
observable.subscribe(observer);
}
- timer() -------------当到了指定时间就发送一个0L的值给观察者
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("along : "+aLong);//along : 0
}
});
- interval() ------------ 每隔一段时间就会发送一个事件(从0开始不断增加1的数字)
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 0, along : 1
}
});
//延迟5s后开始执行
Observable.interval(5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 0, along : 1
}
});
- intervalRange() ------------ 可以指定发送事件的开始值,数量,其他的和interval()一样
//start:起始数值 -------- 10
//count:发射数量 -------- 3
//initialDelay:延迟执行时间-------- 5s
//period:发射周期时间------2s
//unit:时间单位
//数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次
Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
}
});
//start:起始数值 -------- 10
//count:发射数量 -------- 3
//initialDelay:延迟执行时间-------- 5s
//period:发射周期时间------2s
//unit:时间单位
//Scheduler:线程调度
//数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次,在新线程中执行
Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("当前线程 :"+Thread.currentThread().getName());//RxNewThreadScheduler
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
}
});
- range() ----------- 发送一定范围内的事件
//从10开始,执行3次
Observable.range(10,3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("integer : "+integer);//10,11,12
}
});
- rangeLong() ----和range()方法类似,只是数据类型为Long
//从10开始,执行3次
Observable.rangeLong(10,3).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("along : "+aLong);//10,11,12
}
});
- empty() & never() & error()
- empty() --------------- 直接发送 onComplete() 事件
//只会进入 onSubscribe(),onComplete()方法
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});
- never() ---------------- 不发生任何时间
//只会进入 onSubscribe()方法
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});
- error() -------------------发送onError()事件
//只会进入 onSubscribe(),onError()方法
Observable.error(new NullPointerException()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});
转换操作符
- map() -------------- 将被观察者发送的数据类型转换成其他的类型
Observable.just(1,2,3,4,5).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "当前值为:"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
- flatMap() ----------------- 作用于map() 方法类似,返回一个 新的Observerable (无序: flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序 )
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add("当前值为:"+integer);
if (integer == 3) {
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
} else {
return Observable.fromIterable(list);
}
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
LogUtils.e(o.toString());//当前值为:1,2,4,5,3 ------无序的
}
});
- concatMap() ------------ 作用和flatMap() 方法一样(有序:concatMap()转发事件的顺序是有序的)
Observable.just(1, 2, 3, 4, 5).concatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add("当前值为:"+integer);
if (integer == 3) {
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
} else {
return Observable.fromIterable(list);
}
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
LogUtils.e(o.toString());//当前值为:1,2,3,4,5, ------有序的
}
});
- buffer() ------------- 从需要发送的事件中获取一定数量的事件,将这些事件存放到缓冲区中一并发出
//count: 缓冲区元素的数量
//skip: 就代表缓冲区满了之后,发送下一次事件的时候要跳过的元素数量
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
LogUtils.e("缓冲区大小: " + integers.size());
String str = "";
for (int i = 0; i < integers.size(); i++) {
str = str + "," + integers.get(i);
}
LogUtils.e("当前元素: " + str);
}
});
- groupBy() ---------------- 将发送的数据进行分组,每个分组都会返回一个被观察者
Observable.just(1, 2, 3, 4, 5, 6, 7)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 2;//分为2组
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
LogUtils.e(" 第" + integerIntegerGroupedObservable.getKey()+"组");
integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("第" + integerIntegerGroupedObservable.getKey() + "组,当前元素: " + integer);
}
});
}
});
- scan() ---------------- 将数据按照一定的逻辑合并数据
Observable.just(1, 2, 3, 4, 5, 6, 7)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer1, Integer integer2) throws Exception {
LogUtils.e("integer1 = "+integer1);//上一次的结果
LogUtils.e("integer2 = "+integer2);
return integer1 +integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("integer1 + integer2 ="+integer);
}
});
- window() --------------将发送数据 按指定数量进行分组
Observable.just(1, 2, 3, 4, 5, 6, 7)
.window(3)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前元素: " + integer);
}
});
}
});
组合操作符
- zip() -------------- 将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
Observable.zip(
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
}), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
LogUtils.d("A & B 发送的事件: " + res);
return res;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.d( "onNext: " + s);
}
});
- concat() -------------- 将多个观察者组合在一起,然后按照之前的发送顺序发送事件,最多只能合并4个被观察者
Observable.concat(
Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8
}
});
- concatArray() ------------ 作用和concat()方法一样,可以发送多于4个的被观察者
Observable.concatArray(
Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8),
Observable.just(9,10),
Observable.just(11,12)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8,9,10,11,12
}
});
- merge() -------------- 作用和concat() 方法一样,只不过concat()是串行发送,而merge() 是并行发送事件
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A"+aLong;
}
}),
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B"+aLong;
}
})
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);//A,B 交替出现---------- A0,B0,A1,B1,A2,B2
}
});
- combineLatest() ------------ 作用和zip()类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送。
Observable.combineLatest(
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
}), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
LogUtils.d("A & B 发送的事件: " + res);
return res;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.d( "onNext: " + s);
}
});
- concatArrayDelayError() & mergeArrayDelayError()& combineLatestDelayError() -------------- 如果有一个被观察者发送了一个Error事件,那么就结束发送,如果你想将Error() 事件延迟到所有被观察者都发送完事件后再执行。
Observable.concatArrayDelayError(
Observable.just(1,2),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
}
}),
Observable.just(5,6)
).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
LogUtils.e("onNext : 当前数字:"+integer);//1,2,3,5,6
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError : "+e.toString());//java.lang.NullPointerException
}
@Override
public void onComplete() {
}
});
Observable.mergeArrayDelayError(
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A" + aLong;
}
}),
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("C1");
e.onNext("C2");
e.onNext("C3");
e.onError(new NullPointerException());
e.onNext("C4");
e.onNext("C5");
}
}),
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B" + aLong;
}
})
).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(String s) {
LogUtils.e(s);
}
@Override
public void onError(Throwable e) {
LogUtils.e(e.toString());
}
@Override
public void onComplete() { }
});
Observable<String> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
});
Observable<String> observable2 = Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
});
Observable<String> observable3 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("C1");
e.onNext("C2");
e.onNext("C3");
e.onError(new NullPointerException());
e.onNext("C4");
e.onNext("C5");
}
});
Observable.combineLatestDelayError(new ObservableSource[]{observable1, observable2, observable3}, new Function() {
@Override
public Object apply(Object o) throws Exception {
Object[] objects = (Object[]) o;
String res = "";
for (int i = 0; i < objects.length; i++) {
res = res + String.valueOf(objects[i]);
}
LogUtils.d("A & B & C 发送的事件: " + res);
return res;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
LogUtils.d(s);
}
@Override
public void onError(Throwable e) {
LogUtils.e(e.toString());
}
@Override
public void onComplete() {
}
});
- reduce() ------------ 将所有数据聚合在一起才会发送事件给观察者
Observable.just(1,2,3,4,5,6,7,8)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int res = integer + integer2;
LogUtils.d("integer : " + integer);
LogUtils.d("integer2 : " + integer2);
LogUtils.d("res : " + res);
return res;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("accept : "+ integer);
}
});
- collect() ------------ 将数据收集到数据结构当中。
Observable.just(1,2,3,4,5,6,7,8)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
LogUtils.d(integers);
}
});
- startWith() & startWithArray() ------------ 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出
Observable.just(6, 7, 8)
.startWithArray(3, 4, 5)
.startWith(2)
.startWithArray(0, 1)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d(String.valueOf(integer));
}
});
- count() ------------ 返回被观察者发送事件的数量
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.d("发送数量:" + aLong);//8
}
});
功能操作符
- delay() -------------- 延迟一段事件发送事件
Observable.just(1, 2, 3, 4)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnEach() ---------------- Observable 每发送一个之前都会先回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
LogUtils.d("doOnEach 方法 "+ integerNotification.getValue());
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnNext() ----------------- Observable 每发送 onNext() 之前都会先回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnNext(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("doOnNext 方法 "+ integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doAfterNext() -------------- Observable 每发送 onNext() 之后都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doAfterNext(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("doAfterNext 方法 "+ integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnComplete ------------------ Observable 每发送 onComplete() 之前都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnComplete 方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnError() ---------------- Observable 每发送 onError() 之前都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtils.e("doOnError() :"+throwable.toString());
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnSubscribe() ----------------- Observable 每发送 onSubscribe() 之前都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtils.d("doOnSubscribe()方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnDispose() -------------- 当调用 Disposable 的取消订阅dispose()方法之后回调该方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if(integer==2){
disposable.dispose();//取消订阅
}
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnLifecycle() ------------- 在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtils.d("doOnLifecycle accept");
//disposable.dispose();//取消订阅
}
}, new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnLifecycle Action ");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if (integer == 2) {
disposable.dispose();//取消订阅
}
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doOnTerminate() & doAfterTerminate() --------------- doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。如果取消订阅之后 doAfterTerminate() 就不会被回调
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnTerminate() 方法");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doAfterTerminate() 方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- doFinally() ------------- 在所有事件发送完毕之后回调该方法,doFinally() 在取消订阅后也都会被回调,且都会在事件序列的最后。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doFinally() 方法");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doAfterTerminate() 方法");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if (integer == 2) {
disposable.dispose();//取消订阅
}
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- onErrorReturn() ------------------- 当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
LogUtils.e("onErrorReturn() 方法"+throwable.toString());
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- onErrorResumeNext() ----------- 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
LogUtils.e("onErrorResumeNext()方法"+throwable.toString());
return Observable.just(5,6,7,8,9);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- onExceptionResumeNext() -------------- 与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new Exception("111"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(404);
observer.onNext(405);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- retry() ----------------- 如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- retryUntil() --------------- 出现错误事件之后,可以通过此方法判断是否继续发送事件。
final int[] i = {0};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
e.onError(new NullPointerException());
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
if (i[0] >= 6) {//停止继续发送
return true;
} else {
return false;
}
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
i[0] += integer;
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- retryWhen() ---------------- 当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
e.onError(new NullPointerException());
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof NullPointerException){
return Observable.error(new Throwable("终止啦"));
}else{
return Observable.just(6,7,8,9);
}
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- repeat() -------------- 重复发送被观察者的事件,times 为发送次数
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.repeat(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- repeatWhen() ------------------ 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return Observable.empty();//直接发送 onComplete() 事件
// return Observable.just(1);//不发送任何事件
// return Observable.error(new Exception("404"));//发送 onError() 事件
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- subscribeOn() ------------ 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtils.d("当前线程:"+Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
}
});
- observeOn() ----------------- 指定观察者的线程,每指定一次就会生效一次
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtils.d("当前线程:"+Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
}
});
过滤操作符
- filter() -------------- 通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer % 2 == 0) {
return true;
} else {
return false;
}
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- ofType() ----------------- 可以过滤不符合该类型事件
Observable.just("one","two","three",1,2,3)
.ofType(Integer.class)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- skip() & skipLast() --------------- 跳过正序某些事件,count 代表跳过事件的数量
Observable.just(1,2,3,4,5,6)
.skip(2)//跳过前面2个事件
.skipLast(2)//跳过后面2个事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- distinct() ------------ 过滤事件序列中的重复事件
Observable.just(1,2,3,4,4,3,2,1)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);//1,2,3,4
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- distinctUntilChanged() ---------------- 过滤掉连续重复的事件
Observable.just(1,2,3,4,4,3,2,1)
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);//1,2,3,4,3,2,1
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- take() & takeLast() --------------------- 控制观察者接收的事件的数量
Observable.just(1,2,3,4,5,6,7,8,9)
.take(3)//只接收前面3个
// .takeLast(2)//只接收后面2个
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- debounce() ----------------- 如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(900);
e.onNext(2);
}
})
.debounce(1,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- firstElement() && lastElement() ----------------- firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素
Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("firstElement() 方法 " + integer);
}
});
Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("lastElement() 方法 " + integer);
}
});
- elementAt() & elementAtOrError() -------------- elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError()
Observable.just(1, 2, 3, 4)
.elementAt(4)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("elementAt() 方法 " + integer);
}
});
Observable.just(1, 2, 3, 4)
.elementAtOrError(4)//报错
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("elementAtOrError() 方法 " + integer);
}
});
条件操作符
- all() ---------------- 判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false
Observable.just(1, 2, 3, 4)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
- takeWhile() ------------- 可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送
Observable.just(1, 2, 3, 4, 3, 2)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;//如果第一条数据没有满足条件,后面的都不会进行
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
- skipWhile() ---------- 可以设置条件,当某个数据满足条件时不发送该数据,反之则发送
Observable.just(1, 2, 3, 4, 3, 2)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;//当满足条件时,后面的都运行
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
- takeUntil() -------------- 可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了
Observable.just(1, 2, 3, 4, 3, 2)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 3;//当满足条件后,从下一次的事件开始都不会发送了
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
- skipUntil() ------------ 当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Long along) {
LogUtils.d("onNext()方法 : " + along);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- sequenceEqual() --------------- 判断两个 Observable 发送的事件是否相同
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
- contains() ------------- 判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false
Observable.just(1,2,3,4,5)
.contains(3)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
- isEmpty() --------------- 判断事件序列是否为空 ( true :空 )
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
- amb() --------------- amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃
List<Observable<Long>> list = new ArrayList<>();
list.add(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(11, 5, 0, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(21, 5, 2, 1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}
@Override
public void onNext(Long along) {
LogUtils.d("onNext()方法 : " + along);
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}
@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
- defaultIfEmpty() -------------- 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(1001)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
网友评论