本篇文章使用的版本
implementation "io.reactivex.rxjava2:rxjava:2.2.5"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
五种观察者模式
背压机制:在异步场景下,被观察者发送事件的速度 远快于 观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
在进行异步操作时会通过缓存来存储发射的数据。
在 RxJava1.x 时,这些缓存是无界的,当需要缓存的数据非常多的时候,会占用非常多的存储空间,并有可能因为虚拟机不断 GC 而导致程序执行过慢,甚至 OOM。
在 RxJava2.x 中,大多数的异步操作内存都存在一个有界的缓存,当超出这个缓存的时候会抛出 MissingBackpressureException 异常并结束整个队列。
注意:2.0版本 .subscribe(Observer) 方法没有返回值 void,如有需要可参考使用 subscribeWith();但是 不完全参数
public final Disposable subscribe(Consumer<? super T> onNext)
有返回值。
- Observable.subscribe(Observer)
不支持背压
// package io.reactivex;
public abstract class Observable<T> implements ObservableSource<T>
// package io.reactivex;
public interface ObservableSource<T> {
/**
* 注意:2.0版本没有返回值 void
* 如有需要可参考使用 subscribeWith()
*/
void subscribe(@NonNull Observer<? super T> observer);
}
// package io.reactivex;
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
// package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
- Flowable.subscribe(Subscriber)
支持背压
// package io.reactivex;
public abstract class Flowable<T> implements Publisher<T>
// package org.reactivestreams;
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
// package org.reactivestreams;
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
// package org.reactivestreams;
public interface Subscription {
public void request(long n);
public void cancel();
}
- Single. subscribe(SingleObserver)
只有一个 onSuccess 或者 onError
// package io.reactivex;
public abstract class Single<T> implements SingleSource<T>
// package io.reactivex;
public interface SingleSource<T> {
void subscribe(@NonNull SingleObserver<? super T> observer);
}
// package io.reactivex;
public interface SingleObserver<T> {
void onSubscribe(@NonNull Disposable d);
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable e);
}
// package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
- Completable.subscribe(CompletableObserver)
只有一个 onComplete 或者 onError
// package io.reactivex;
public abstract class Completable implements CompletableSource
// package io.reactivex;
public interface CompletableSource {
void subscribe(@NonNull CompletableObserver co);
}
// package io.reactivex;
public interface CompletableObserver {
void onSubscribe(@NonNull Disposable d);
void onComplete();
void onError(@NonNull Throwable e);
}
// package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
- Maybe. subscribe(MaybeObserver)
只有一个 onSuccess 或者 onError 或者 onComplete
// package io.reactivex;
public abstract class Maybe<T> implements MaybeSource<T>
// package io.reactivex;
public interface MaybeSource<T> {
void subscribe(@NonNull MaybeObserver<? super T> observer);
}
// package io.reactivex;
public interface MaybeObserver<T> {
void onSubscribe(@NonNull Disposable d);
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
// package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
Observable 操作符使用
看图即可,之后具体分析可忽略,直接看下一节
RxJava2操作符
创建操作符
create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
just()
不超过10个
public static <T> Observable<T> just(T item)
...
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
from 系列
public static <T> Observable<T> fromArray(T... items)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
defer()
只有在观察者订阅的时候才会创建被观察者
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1);
}
});
timer()
当到指定时间后发送一个 0L 的值给观察者
public static Observable<Long> timer(long delay, TimeUnit unit)
interval
每隔一段时间发送一个事件,不断增1的数字
public static Observable<Long> interval(long period, TimeUnit unit)
intervalRange
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
range
public static Observable<Integer> range(final int start, final int count)
rangeLong
public static Observable<Long> rangeLong(long start, long count)
empty & never & error
// 不发送任何事件
public static <T> Observable<T> never()
// 发送 onComplete() 事件
public static <T> Observable<T> empty()
// 发送 onError() 事件
public static <T> Observable<T> error(final Throwable exception)
repeat() & repeatWhen()
转换操作符
map() & cast()
一对一
map cast
flatMap()& concatMap()
一对多,flatMap不保证事件的顺序,concatMap与上游发送的顺序一致
buffer()
将整个事件流进行分组
Observable.range(1,7)
.buffer(3)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG,integers.toString());
Log.i(TAG,"----");
}
});
输出
01-29 15:03:09.570 I/TestRxJava2Operate: [1, 2, 3]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
01-29 15:03:09.570 I/TestRxJava2Operate: [4, 5, 6]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
01-29 15:03:09.570 I/TestRxJava2Operate: [7]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
buffer
groupBy()
groupByscan()
累加器 accumulator,依次输出
scan
reduce()
和 scan() 一样累加,只是只发送最后一个值
window()
和 buffer 类似,但不是发射来自 Observable 的数据包,发射的是 Observable,最后发射一个 onCompleted 通知
[图片上传失败...(image-d3834c-1548834337169)]
过滤操作符
filter()
规则过滤
distinct() & distinctUntilChanged()
去重过滤
skip() & skipLast() & skipWhile() & skipUntil()
过滤掉前几项
take() & takeLast() & takeUntil() & takeWhile()
只保留前几项
elementAt() & firstElement() & lastElement()
获取队列中指定位置的事件
ignoreElements()
过滤掉所有队列中的事件,只保留 onComplete / onError
throttleFirst() & throttleLast & throttleLatest & throttleWithTimeout
对时间进行切片,选取第一个,最后一个,最近的一个
throttleLast 底层使用的 sample 方法实现
throttleWithTimeout 底层使用的 debounce 方法实现;仅在过了一段指定的时间还没有发射数据时才发射一个数据,如果在一个事件片达到之前,发射的数据之后又紧跟着发射了一个数据,那么这个时间片 之前发射的数据会被丢弃
debounce()
用来限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据,否则丢弃之前的数据
debounce
sample()
throttleLast 内部实现调用的是 sample
sample
ofType()
过滤掉不符合该类型的事件
组合操作符
startWith() & startWithArray()
在事件队列之前插入数据
merge() & mergeArray() & mergeDelayError & mergeArrayDelayError()
多个事件队列合并起来发射,可能交错无序
merge() 和 mergeError() 只有在处理错误 onError 时不同。mergeError() 在错误之前所有事件发射完毕之后才把错误发射出来,多个错误只发射一个错误;
merge() 在遇到错误时,直接抛出来结束操作
concat() & concatArray() & concatEager()& concatDelayError & concatArrayDelayError()
多个事件队列合并起来发射,严格按顺序发射
concatEager方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有ObservableSource,并且会先缓存这些ObservableSource发射的数据,然后再按照顺序将它们发射出来;
zip() & zipArray() & zipIterable()
将多个数据项合并,可以指定合并规则
combineLatest() & combineLatestDelayError()
组合最近的俩个数据
image
collect()
将数据收集到数据结构中
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
@Override
public ArrayList < Integer > call() throws Exception {
return new ArrayList < > ();
}
},
new BiConsumer < ArrayList < Integer > , Integer > () {
@Override
public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer < ArrayList < Integer >> () {
@Override
public void accept(ArrayList < Integer > integers) throws Exception {
Log.d(TAG, "===============accept " + integers);
}
});
辅助操作符
delay()
在发送事件前延迟
doXXX 系列
doOnSubscribe,在 subscribe() 订阅之前触发
doOnLifecycle,
doOnEach,在每个 onNext() 调用之前触发
doOnNext,在 onNext() 之前触发
doAfterNext,在 onNext() 方法之后触发
doOnTerminate,在 Observable 终止onComplete() / onError()之前触发
doOnComplete,在 onComplete() 之前触发
doOnError,在 onError() 之前触发
doFinally,在 onComplete() 或 onError() 结束之后触发
doAfterTerminate,在 Observable 终止之后触发
doOnDispose,在 dispose 之前触发
输出示例:
01-28 16:08:30.460 I/TestRxJava2: doOnSubscribe
01-28 16:08:30.460 I/TestRxJava2: doOnLifecycle Consumer
01-28 16:08:30.460 I/TestRxJava2: Observer onSubscribe
01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onNext 0
01-28 16:08:30.460 I/TestRxJava2: doOnEach
01-28 16:08:30.460 I/TestRxJava2: doOnNext
01-28 16:08:30.460 I/TestRxJava2: Observer onNext 0
01-28 16:08:30.460 I/TestRxJava2: doAfterNext
01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onComplete
01-28 16:08:30.460 I/TestRxJava2: doOnTerminate
01-28 16:08:30.460 I/TestRxJava2: doOnComplete
01-28 16:08:30.460 I/TestRxJava2: doOnEach
01-28 16:08:30.460 I/TestRxJava2: Observer onComplete
01-28 16:08:30.460 I/TestRxJava2: doFinally
01-28 16:08:30.460 I/TestRxJava2: doAfterTerminate
subscribeOn() & observeOn()
线程切换,subscribeOn()指定观察者运行的线程;observeOn()指定被观察者发射事件所运行的线程
timeout()
设置超时时间,指定时间内没有任何数据,就执行我们的数据项,后续事件不再发送
错误处理操作符
onErrorReturn() & onErrorResumeNext() & onExceptionResumeNext()
onErrorReturn(): 在触发 onError 的时候用一个值代替,并调用 onCompleted() 结束本次队列,而不会将错误传递给观察者
onErrorResumeNext(): 用 Observable 代替
onExceptionResumeNext(): 只处理 Exception() ,如果是 Error 则不处理。(二者都继承 Throwable)
retry() & retryUtil() & retryWhen()
重复试错
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("Error1"));
// emitter.onError(new Throwable("Error2"));
emitter.onNext(3);
}
})
.retry(2, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return true;
}
})
.subscribe();
// 输出
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onSubscribe false
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onError: java.lang.Throwable: Error1
条件操作符
all() & any()
all(): 是否全部满足
any(): 是否存在一个
contains() & isEmpty()
contains(): 是否包含指定项
isEmpty(): 是否为空
sequenceEqual()
判断俩个序列是否相等
amb()
作用两个或多个 Observable,但是只会发射最先发射数据的那个 Observable 的全部数据
Observable
.amb(Arrays.asList(Observable.range(1, 5),
Observable.range(6, 5)))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "testAmb " + integer);
}
});
defaultIfEmpty()
当指定的序列为空的时候指定一个用于发射的值,需要调用 onCompleted()
转换操作符
toList() & toSortedList()
toMap() & toMultimap()
toFlowable()
to()
Flowable 背压机制
使用示例
示例1
Flowable
.range(0, 100)
.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.w(TAG, "onsubscribe start");
sub = s;
s.request(2);
Log.w(TAG, "onsubscribe end");
}
@Override
public void onNext(Integer integer) {
Log.w(TAG, "onNext--->" + integer);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sub.request(3);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w(TAG, "onComplete");
}
});
输出结果如下
01-28 10:48:09.040 W/TestRxJava2: onsubscribe start
01-28 10:48:09.040 W/TestRxJava2: onsubscribe end
01-28 10:48:09.050 W/TestRxJava2: onNext--->0
01-28 10:48:11.050 W/TestRxJava2: onNext--->1
01-28 10:48:13.050 W/TestRxJava2: onNext--->2
01-28 10:48:15.050 W/TestRxJava2: onNext--->3
01-28 10:48:17.050 W/TestRxJava2: onNext--->4
示例2
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "Consumer accept integer: " + integer);
}
});
解决背压问题
导致的原因:使用 .observeOn()
方法监听了 非背压 的PublishProcessor
、timer()
、interval()
或者自定义的create()
。
.observeOn()
方法的默认缓存大小是128,当生产的速度过快时,会很快超出该缓存大小,从而导致内存溢出。
1 增加缓存大小
使用.observeOn()
重载方法来设置缓存的大小
PublishProcessor<Integer> source = PublishProcessor.create();
source
.observeOn(Schedulers.computation(), false, 1024 * 1024)
// .observeOn(Schedulers.computation())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
compute(integer);
}
});
for (int i = 1; i < 1_000_000; i++) {
source.onNext(i);
}
只解决暂时背压问题,当生产速率过快的时候还是可能造成缓存溢出
2 通过丢弃或者过滤来减轻缓存压力
使用 throttleXXX
或者 sample()
等方式减少接收的数据
PublishProcessor<Integer> source = PublishProcessor.create();
source
.sample(1, TimeUnit.SECONDS)
.observeOn(Schedulers.computation())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
compute(integer);
}
});
for (int i = 1; i < 1_000_000; i++) {
source.onNext(i);
}
该方式仅用来减少下游接收的数据,不改变上游发送数据和缓存数据,解决背压问题,但还是会导致缓存溢出
3 onBackpressureBuffer
onBackpressureBuffer()
无参的方式使用一个无界的缓存,只要虚拟机没有抛出 OOM 异常,它就会把所有的数据缓存下来,而只会将一小部分的数据传递给 observeOn
Flowable
.range(1, Integer.MAX_VALUE)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), false, 8)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
compute(integer);
}
});
这种处理方式实际上不存在背压,因为 onBackpressureBuffer 缓存了所有的数据
onBackpressureBuffer 系列
onBackpressureBuffer(boolean delayError)
onBackpressureBuffer(int capacity)
onBackpressureBuffer(int capacity, boolean delayError)
onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded)
onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,Action onOverflow)
onBackpressureBuffer(int capacity, Action onOverflow)
onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)
capacity:指定有界缓存
delayError:是否延迟抛出异常
unbounded:无界
onOverflow:缓存溢出回调
overflowStrategy:缓存策略
public enum BackpressureOverflowStrategy {
/** 抛出异常 BufferOverflowException */
ERROR,
/** 丢去最老的值*/
DROP_OLDEST,
/** 丢弃最新的值 */
DROP_LATEST
}
4 onBackpressureDrop
不会缓存任何数据,专注当下,新来的数据来不及处理就丢掉,以后会有更好的
bp.obp.drop.png
5 onBackpressureLatest
会缓存一个数据,当正在执行某个人物的时候有新的数据过来,会把它缓存起来,如果又有新的数据过来,那就把之前的替换掉,缓存里面的总是最新的
bp.obp.latest.png
操作符
public interface Action {
void run() throws Exception;
}
public interface Consumer<T> {
void accept(T t) throws Exception;
}
public interface BiConsumer<T1, T2> {
void accept(T1 t1, T2 t2) throws Exception;
}
public interface LongConsumer {
void accept(long t) throws Exception;
}
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
public interface BiFunction<T1, T2, R> {
R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
public interface Function3<T1, T2, T3, R> {
R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}
... Function4 ... Function8 ...
public interface Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> {
R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3, @NonNull T4 t4, @NonNull T5 t5, @NonNull T6 t6, @NonNull T7 t7, @NonNull T8 t8, @NonNull T9 t9) throws Exception;
}
public interface IntFunction<T> {
T apply(int i) throws Exception;
}
public interface Predicate<T> {
boolean test(@NonNull T t) throws Exception;
}
public interface BiPredicate<T1, T2> {
boolean test(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
public interface Cancellable {
void cancel() throws Exception;
}
public interface BooleanSupplier {
boolean getAsBoolean() throws Exception; // NOPMD
}
非背压 Subject
Subject 可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 之外,所有的方法都是线程安全的。此外,你还可以使用 toSerialized() 方法,也就是转换成串行的,将这些方法设置成线程安全的,如 PublishSubject.create().toSerialized()
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
public abstract boolean hasObservers();
public abstract boolean hasThrowable();
public abstract boolean hasComplete();
public abstract Throwable getThrowable();
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
}
AsyncSubject
、BehaviorSubjec
t、SingleSubject
、PublishSubject
、ReplaySubject
、UnicastSubject
、CompletableSubject
、MaybeSubject
等均继承 Subject
AsyncSubject
非粘性,只有先注册后发送事件才能接收
只有调用 onComplete 才能触发
public void testAsyncSubject() {
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("one");
subject.onNext("two");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "testAsyncSubject: " + s);
}
});
subject.onNext("three");
subject.onComplete();
}
输出
01-27 18:18:03.190 I/TestRxJava2: testAsyncSubject: three
BehaviorSubject
能收到订阅之前的最后一个事件 和订阅之后发送的事件
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onNext("two");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "testBehaviorSubject first: " + s);
}
});
subject.onNext("three");
subject.onNext("four");
输出
01-27 18:18:27.990 I/TestRxJava2: testBehaviorSubject: two
01-27 18:18:27.990 I/TestRxJava2: testBehaviorSubject: three
01-27 18:18:27.990 I/TestRxJava2: testBehaviorSubject: four
PublishSubject
非粘性,只有先注册后发送事件才能接收
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.onNext(2);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "testPublishSubject first: " + integer);
}
});
subject.onNext(3);
subject.onNext(4);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "testPublishSubject second: " + integer);
}
});
subject.onNext(5);
subject.onNext(6);
输出
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject first: 3
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject first: 4
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject first: 5
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject second: 5
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject first: 6
01-27 18:14:33.600 I/TestRxJava2: testPublishSubject second: 6
ReplaySubject
粘性事件
ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onNext("two");
subject.onNext("four");
subject.onNext("five");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "testRelaySubject first: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "testRelaySubject first error");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "testRelaySubject first onComplete");
}
});
subject.onNext("six");
subject.onNext("seven");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "testRelaySubject second: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "testRelaySubject second error");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "testRelaySubject second onComplete");
}
});
subject.onNext("eight");
subject.onNext("nine");
subject.onComplete();
输出
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: zero
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: one
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: two
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: four
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: five
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: six
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: seven
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: zero
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: one
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: two
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: four
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: five
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: six
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: seven
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: eight
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: eight
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: nine
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: nine
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first onComplete
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second onComplete
UnicastSubject
粘性事件,只能有一个观察者,java.lang.IllegalStateException: Only a single observer allowed.
UnicastSubject<Integer> subject = UnicastSubject.create();
subject.onNext(0);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onNext(6);
subject.onNext(7);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "testUnicastSubject first: " + integer);
}
});
subject.onNext(8);
subject.onNext(9);
输出
01-27 18:22:10.440 I/TestRxJava2: testUnicastSubject first: 0
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 1
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 2
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 3
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 4
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 5
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 6
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 7
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 8
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 9
背压 Processor
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {
public abstract boolean hasSubscribers();
public abstract boolean hasThrowable();
public abstract boolean hasComplete();
public abstract Throwable getThrowable();
public final FlowableProcessor<T> toSerialized() {
if (this instanceof SerializedProcessor) {
return this;
}
return new SerializedProcessor<T>(this);
}
}
以下均继承FlowableProcessor
AsyncProcessor
BehaviorProcessor --
MulticastProcessor
PublishProcessor --
ReplayProcessor
SerializedProcessor --
UnicastProcessor
Scheduler 线程调度
computation()
io()
trampoline()
newThread()
single()
from(@NonNull Executor executor)
统一取消订阅
CompositeDisposable 统一订阅
subscribeWith 返回观察者
ResourceSubscriber 等实现 Disposable 接口
CompositeDisposable compositeDisposable = new CompositeDisposable();
ResourceSubscriber<Integer> resourceSubscriber
= Flowable
.range(1, 8)
.subscribeWith(new ResourceSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
compositeDisposable.add(resourceSubscriber);
类似 ResourceSubscriber 的还有
Subscribers 系列
public abstract class DisposableSubscriber<T> implements FlowableSubscriber<T>, Disposable
public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable
// 没有实现 Disposable,而是实现了Subscription
public final class SafeSubscriber<T> implements FlowableSubscriber<T>, Subscription
// 没有实现 Disposable,而是实现了Subscription
public final class SerializedSubscriber<T> implements FlowableSubscriber<T>, Subscription
public class TestSubscriber<T>
extends BaseTestConsumer<T, TestSubscriber<T>>
implements FlowableSubscriber<T>, Subscription, Disposable
Observers 系列
public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable
public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable
public abstract class DisposableObserver<T> implements Observer<T>, Disposable
public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable
public abstract class ResourceObserver<T> implements Observer<T>, Disposable
public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable
public final class SafeObserver<T> implements Observer<T>, Disposable
public final class SerializedObserver<T> implements Observer<T>, Disposable
public class TestObserver<T>
extends BaseTestConsumer<T, TestObserver<T>>
implements Observer<T>, Disposable, MaybeObserver<T>, SingleObserver<T>, CompletableObserver
参考资料
感谢以下文章作者
RxJava2 系列-1:一篇的比较全面的 RxJava2 方法总结
关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
RxJava2 只看这一篇文章就够了
RxJava 组合操作符
网友评论