Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。
使用toSerialized()将Subject作为一个观察者,避免在多线程中调用On系列方法,导致无法顺序调用;
源码类的定义如下:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
}
所以,它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。
Subject 有四种类型:
1. AsyncSubject :
发送Observable发送来的最后一个值;
如果Observable未发送,他则不发送任何东西;
如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知;
下图是官方图片:
传值图 发生异常图简单使用:
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject 1");
asyncSubject.onNext("asyncSubject 2");
asyncSubject.onNext("asyncSubject 3");
asyncSubject.onNext("asyncSubject 4");
asyncSubject.onNext("asyncSubject 5");
asyncSubject.onComplete();
asyncSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AsyncSubject accept: ", s);
}
});
执行结果:
2020-06-02 16:31:44.469 6962-6962/com.example.rxjavestudydemo D/AsyncSubject accept:: asyncSubject 5
说明:
- 不管订阅在什么位置发生关系;
- 只会执行最后一条发送的数据;
2. BehaviorSubject:
官方图发送订阅关系发生时的最近的一次数据;
如果没有最近的数据,则发送默认值;
如果发生了错误而终止,则只会简单 的向前传递错误通知;
简单使用:
BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("behaviorSubject default");
behaviorSubject.onNext("behaviorSubject 1");
behaviorSubject.onNext("behaviorSubject 2");
behaviorSubject.onNext("behaviorSubject 3");
behaviorSubject.onNext("behaviorSubject 4");
behaviorSubject.onNext("behaviorSubject 5");
behaviorSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("behaviorSubject accept: ", s);
}
});
behaviorSubject.onComplete();
执行结果:
2020-06-02 17:38:46.375 24771-24771/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
2020-06-02 17:38:46.524 24771-24771/com.example.rxjavestudydemo D/behaviorSubject accept:: behaviorSubject 5
3. PublishSubject:
A. 只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者;
B. 如果Observable发生异常或完成通知,则PublishSubject仅仅会将异常和完成的通知传递下去,并终止订阅关系。
需要注意的是:
PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生);
***风险:***
在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。
***解决办法:***
如果要确保来自原始Observable的所有数据都被分发,你需要这样做:
或使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据);
或改用ReplaySubject。
传值
异常终止
简单使用:
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject 1");
publishSubject.onNext("publishSubject 2");
publishSubject.onNext("publishSubject 3");
publishSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("publishSubject", "accept: " + s);
}
});
publishSubject.onNext("publishSubject 4");
publishSubject.onNext("publishSubject 5");
publishSubject.onComplete();
执行结果:
2020-06-02 16:45:04.511 10667-10667/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 4
2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 5
说明:
1、订阅关系的发生,是按照代码的调用顺序来决定的;即根据调用的先后顺序来决定执行那些数据的发送;
2、订阅关系发生后,不过有多少数据都会发送出去;
3、Observable 一旦发生OnError()或者OnComplete() ,则只会传递相关的通知;
4. ReplaySubject:
官方图ReplaySubject 会发射所有来自Observable的原始数据,不管什么时候订阅的观察者。但是也有一种特殊情况:当重放缓存增长到一定大小,或者是过了一段时间后,会丢弃原始数据。
简单使用:
ReplaySubject<String> replaySubject = create();
replaySubject.onNext("replaySubject 1");
replaySubject.onNext("replaySubject 2");
replaySubject.onNext("replaySubject 3");
replaySubject.onNext("replaySubject 4");
replaySubject.onNext("replaySubject 5");
replaySubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("replaySubject", "accept: " + s);
}
});
replaySubject.onComplete();
执行结果:
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 1
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 2
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 3
2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 4
2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 5
注意 :
如果ReplaySubject 是一个观察者,不要在多个线程中调用它的On系列的方法,有可能出现非顺序调用;
网友评论