- 组合操作符
3.1 concat()
方法预览:
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
有什么用?
可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
怎么用?
Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印如下:
05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
3.2 concatArray()
方法预览:
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
有什么用?
与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
怎么用?
Observable.concatArray(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8),
Observable.just(9, 10))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果:
05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
================onNext 9
================onNext 10
3.3 merge()
方法预览:
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
有什么用?
这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。
怎么用?
现在来演示 concat() 和 merge() 的区别。
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
@Override
public String apply(Long aLong) throws Exception {
return "A" + aLong;
}
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
@Override
public String apply(Long aLong) throws Exception {
return "B" + aLong;
}
}))
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "=====================onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果如下:
05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
......
从结果可以看出,A 和 B 的事件序列都可以发出,将以上的代码换成 concat() 看看打印结果:
05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
......
从结果可以知道,只有等到第一个被观察者发送完事件之后,第二个被观察者才会发送事件。
mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者,这里就不再赘述了。
3.4 concatArrayDelayError() & mergeArrayDelayError()
方法预览:
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
有什么用?
在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
怎么用?
首先使用 concatArray() 来验证一下发送 onError() 事件是否会中断其他被观察者发送事件,代码如下:
Observable.concatArray(
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
}
});
打印结果:
05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
===================onError
从结果可以知道,确实中断了,现在换用 concatArrayDelayError(),代码如下:
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
}
});
打印结果如下:
05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError
从结果可以看到,onError 事件是在所有被观察者发送完事件才发送的。mergeArrayDelayError() 也是有同样的作用,这里不再赘述。
3.5 zip()
方法预览:
public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
......
有什么用?
会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
怎么用?
Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 发送的事件 " + s1);
return s1;
}}),
Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:
05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe
05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
===================onNext A1B1
05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
===================onNext A2B2
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
===================onComplete
可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。
3.6 combineLatest() & combineLatestDelayError()
方法预览:
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
.......
有什么用?
combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码。
怎么用?
Observable.combineLatest(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
.map(new Function < Long, String > () {@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 发送的事件 " + s1);
return s1;
}
}),
Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
.map(new Function < Long, String > () {@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction < String, String, String > () {@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================最终接收到的事件 " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔2秒发送一次事件。来看看打印结果:
05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe
05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A2B1
05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
===================最终接收到的事件 A3B1
05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B1
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B2
05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B3
05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B4
05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B5
===================onComplete
分析上述结果可以知道,当发送 A1 事件之后,因为 B 并没有发送任何事件,所以根本不会发生结合。当 B 发送了 B1 事件之后,就会与 A 最近发送的事件 A2 结合成 A2B1,这样只有后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。
因为 combineLatestDelayError() 就是多了延迟发送 onError() 功能,这里就不再赘述了。
3.7 reduce()
方法预览:
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
有什么用?
与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。
怎么用?
Observable.just(0, 1, 2, 3)
.reduce(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int res = integer + integer2;
Log.d(TAG, "====================integer " + integer);
Log.d(TAG, "====================integer2 " + integer2);
Log.d(TAG, "====================res " + res);
return res;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================accept " + integer);
}
});
打印结果:
05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
====================integer2 1
====================res 1
====================integer 1
====================integer2 2
====================res 3
====================integer 3
====================integer2 3
====================res 6
==================accept 6
从结果可以看到,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止。
3.8 collect()
方法预览:
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
#### 有什么用?
将数据收集到数据结构当中。
怎么用?
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
@Override
public ArrayList < Integer > call() throws Exception {
return new ArrayList < > ();
}
},
new BiConsumer < ArrayList < Integer > , Integer > () {
@Override
public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer < ArrayList < Integer >> () {
@Override
public void accept(ArrayList < Integer > integers) throws Exception {
Log.d(TAG, "===============accept " + integers);
}
});
打印结果:
05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]
3.9 startWith() & startWithArray()
方法预览:
public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
有什么用?
在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
怎么用?
Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "================accept " + integer);
}
});
打印结果:
05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7
3.10 count()
方法预览:
public final Single<Long> count()
有什么用?
返回被观察者发送事件的数量。
怎么用?
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "=======================aLong " + aLong);
}
});
打印结果:
05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3
- 功能操作符
4.1 delay()
方法预览:
public final Observable<T> delay(long delay, TimeUnit unit)
有什么用?
延迟一段事件发送事件。
怎么用?
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "=======================onSubscribe");
}
});
这里延迟了两秒才发送事件,来看看打印结果:
05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe
05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1
05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2
=======================onNext 3
=======================onSubscribe
从打印结果可以看出 onSubscribe 回调2秒之后 onNext 才会回调。
4.2 doOnEach()
方法预览:
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
有什么用?
Observable 每发送一件事件之前都会先回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
// e.onError(new NumberFormatException());
e.onComplete();
}
})
.doOnEach(new Consumer < Notification < Integer >> () {
@Override
public void accept(Notification < Integer > integerNotification) throws Exception {
Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete
从结果就可以看出每发送一个事件之前都会回调 doOnEach 方法,并且可以取出 onNext() 发送的值。
4.3 doOnNext()
方法预览:
public final Observable<T> doOnNext(Consumer<? super T> onNext)
有什么用?
Observable 每发送 onNext() 之前都会先回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doOnNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete
4.4 doAfterNext()
方法预览:
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
有什么用?
Observable 每发送 onNext() 之后都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doAfterNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doAfterNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete
4.5 doOnComplete()
方法预览:
public final Observable<T> doOnComplete(Action onComplete)
有什么用?
Observable 每发送 onComplete() 之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnComplete ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete
==================onComplete
4.6 doOnError()
方法预览:
public final Observable<T> doOnError(Consumer<? super Throwable> onError)
有什么用?
Observable 每发送 onError() 之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.doOnError(new Consumer < Throwable > () {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "==================doOnError " + throwable);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError
4.7 doOnSubscribe()
方法预览:
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
有什么用?
Observable 每发送 onSubscribe() 之前都会回调这个方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnSubscribe(new Consumer < Disposable > () {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "==================doOnSubscribe ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
4.8 doOnDispose()
方法预览:
public final Observable<T> doOnDispose(Action onDispose)
有什么用?
当调用 Disposable 的 dispose() 之后回调该方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose ");
}
})
.subscribe(new Observer < Integer > () {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================doOnDispose
4.9 doOnLifecycle()
方法预览:
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
有什么用?
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。
怎么用?
doOnLifecycle() 第二个参数的回调方法的作用与 doOnDispose() 是一样的,现在用下面的例子来讲解:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "==================doOnLifecycle accept");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnLifecycle Action");
}
})
.doOnDispose(
new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose Action");
}
})
.subscribe(new Observer<Integer>() {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept
==================onSubscribe
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action
可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。
如果使用 doOnLifecycle 进行取消订阅,来看看打印结果:
05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept
==================onSubscribe
可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。
4.10 doOnTerminate() & doAfterTerminate()
方法预览:
public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)
有什么用?
doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnTerminate ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3
==================doOnTerminate
==================onComplete
doAfterTerminate 也是差不多,这里就不再赘述。
4.11 doFinally()
方法预览:
public final Observable<T> doFinally(Action onFinally)
有什么用?
在所有事件发送完毕之后回调该方法。
怎么用?
这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。
现在用以下例子说明下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doFinally ");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose ");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doAfterTerminate ");
}
})
.subscribe(new Observer<Integer>() {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印结果:
05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe
05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1
==================doOnDispose
==================doFinally
可以看到如果调用了 dispose() 方法,doAfterTerminate() 不会被回调。
现在试试把 dispose() 注释掉看看,看看打印结果:
05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
==================doAfterTerminate
==================doFinally
doAfterTerminate() 已经成功回调,doFinally() 还是会在事件序列的最后。
网友评论