美文网首页
RxJava学习笔记之Subject

RxJava学习笔记之Subject

作者: 初夏的雪 | 来源:发表于2020-06-02 16:49 被阅读0次

Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。

使用toSerialized()将Subject作为一个观察者,避免在多线程中调用On系列方法,导致无法顺序调用;

源码类的定义如下:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
}

所以,它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

Subject 有四种类型:

1. AsyncSubject :

发送Observable发送来的最后一个值;
如果Observable未发送,他则不发送任何东西;
如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知;

下图是官方图片:

传值图 发生异常图

简单使用:

 AsyncSubject<String> asyncSubject = AsyncSubject.create();
        asyncSubject.onNext("asyncSubject 1");
        asyncSubject.onNext("asyncSubject 2");
        asyncSubject.onNext("asyncSubject 3");
        asyncSubject.onNext("asyncSubject 4");
        asyncSubject.onNext("asyncSubject 5");
        asyncSubject.onComplete();
        asyncSubject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("AsyncSubject accept: ", s);
            }
        });


执行结果:
2020-06-02 16:31:44.469 6962-6962/com.example.rxjavestudydemo D/AsyncSubject accept:: asyncSubject 5

说明:

  1. 不管订阅在什么位置发生关系;
  2. 只会执行最后一条发送的数据;

2. BehaviorSubject:

发送订阅关系发生时的最近的一次数据;
如果没有最近的数据,则发送默认值;
如果发生了错误而终止,则只会简单 的向前传递错误通知;

官方图

简单使用:

BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("behaviorSubject default");
        behaviorSubject.onNext("behaviorSubject 1");
        behaviorSubject.onNext("behaviorSubject 2");
        behaviorSubject.onNext("behaviorSubject 3");
        behaviorSubject.onNext("behaviorSubject 4");
        behaviorSubject.onNext("behaviorSubject 5");
        behaviorSubject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("behaviorSubject  accept: ", s);
            }
        });
        behaviorSubject.onComplete();

执行结果:
2020-06-02 17:38:46.375 24771-24771/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
2020-06-02 17:38:46.524 24771-24771/com.example.rxjavestudydemo D/behaviorSubject  accept:: behaviorSubject 5

3. PublishSubject:

A. 只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者;
B. 如果Observable发生异常或完成通知,则PublishSubject仅仅会将异常和完成的通知传递下去,并终止订阅关系。

需要注意的是:

   PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生);

   ***风险:***
     在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

   ***解决办法:***

   如果要确保来自原始Observable的所有数据都被分发,你需要这样做:

   或使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据);
   或改用ReplaySubject。
传值 异常终止

简单使用:

 PublishSubject<String> publishSubject = PublishSubject.create();
        publishSubject.onNext("publishSubject 1");
        publishSubject.onNext("publishSubject 2");
        publishSubject.onNext("publishSubject 3");
        publishSubject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("publishSubject", "accept: " + s);
            }
        });
        publishSubject.onNext("publishSubject 4");
        publishSubject.onNext("publishSubject 5");
        publishSubject.onComplete();

执行结果:
2020-06-02 16:45:04.511 10667-10667/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 4
2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 5

说明:

1、订阅关系的发生,是按照代码的调用顺序来决定的;即根据调用的先后顺序来决定执行那些数据的发送;
2、订阅关系发生后,不过有多少数据都会发送出去;
3、Observable 一旦发生OnError()或者OnComplete() ,则只会传递相关的通知;

4. ReplaySubject:

ReplaySubject 会发射所有来自Observable的原始数据,不管什么时候订阅的观察者。但是也有一种特殊情况:当重放缓存增长到一定大小,或者是过了一段时间后,会丢弃原始数据。

官方图

简单使用:

ReplaySubject<String> replaySubject = create();
        replaySubject.onNext("replaySubject 1");
        replaySubject.onNext("replaySubject 2");
        replaySubject.onNext("replaySubject 3");
        replaySubject.onNext("replaySubject 4");
        replaySubject.onNext("replaySubject 5");
        replaySubject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("replaySubject", "accept: " + s);
            }
        });
        replaySubject.onComplete();

执行结果:
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 1
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 2
2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 3
2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 4
2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 5

注意 :

如果ReplaySubject 是一个观察者,不要在多个线程中调用它的On系列的方法,有可能出现非顺序调用;

相关文章

网友评论

      本文标题:RxJava学习笔记之Subject

      本文链接:https://www.haomeiwen.com/subject/exnuzhtx.html