美文网首页
RxJava学习笔记之Subject

RxJava学习笔记之Subject

作者: 初夏的雪 | 来源:发表于2020-06-02 16:49 被阅读0次

    Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。

    使用toSerialized()将Subject作为一个观察者,避免在多线程中调用On系列方法,导致无法顺序调用;

    源码类的定义如下:

    public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    }
    

    所以,它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

    Subject 有四种类型:

    1. AsyncSubject :

    发送Observable发送来的最后一个值;
    如果Observable未发送,他则不发送任何东西;
    如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知;

    下图是官方图片:

    传值图 发生异常图

    简单使用:

     AsyncSubject<String> asyncSubject = AsyncSubject.create();
            asyncSubject.onNext("asyncSubject 1");
            asyncSubject.onNext("asyncSubject 2");
            asyncSubject.onNext("asyncSubject 3");
            asyncSubject.onNext("asyncSubject 4");
            asyncSubject.onNext("asyncSubject 5");
            asyncSubject.onComplete();
            asyncSubject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("AsyncSubject accept: ", s);
                }
            });
    
    
    执行结果:
    2020-06-02 16:31:44.469 6962-6962/com.example.rxjavestudydemo D/AsyncSubject accept:: asyncSubject 5
    

    说明:

    1. 不管订阅在什么位置发生关系;
    2. 只会执行最后一条发送的数据;

    2. BehaviorSubject:

    发送订阅关系发生时的最近的一次数据;
    如果没有最近的数据,则发送默认值;
    如果发生了错误而终止,则只会简单 的向前传递错误通知;

    官方图

    简单使用:

    BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("behaviorSubject default");
            behaviorSubject.onNext("behaviorSubject 1");
            behaviorSubject.onNext("behaviorSubject 2");
            behaviorSubject.onNext("behaviorSubject 3");
            behaviorSubject.onNext("behaviorSubject 4");
            behaviorSubject.onNext("behaviorSubject 5");
            behaviorSubject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("behaviorSubject  accept: ", s);
                }
            });
            behaviorSubject.onComplete();
    
    执行结果:
    2020-06-02 17:38:46.375 24771-24771/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
    2020-06-02 17:38:46.524 24771-24771/com.example.rxjavestudydemo D/behaviorSubject  accept:: behaviorSubject 5
    

    3. PublishSubject:

    A. 只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者;
    B. 如果Observable发生异常或完成通知,则PublishSubject仅仅会将异常和完成的通知传递下去,并终止订阅关系。

    需要注意的是:

       PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生);
    
       ***风险:***
         在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。
    
       ***解决办法:***
    
       如果要确保来自原始Observable的所有数据都被分发,你需要这样做:
    
       或使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据);
       或改用ReplaySubject。
    
    传值 异常终止

    简单使用:

     PublishSubject<String> publishSubject = PublishSubject.create();
            publishSubject.onNext("publishSubject 1");
            publishSubject.onNext("publishSubject 2");
            publishSubject.onNext("publishSubject 3");
            publishSubject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("publishSubject", "accept: " + s);
                }
            });
            publishSubject.onNext("publishSubject 4");
            publishSubject.onNext("publishSubject 5");
            publishSubject.onComplete();
    
    执行结果:
    2020-06-02 16:45:04.511 10667-10667/com.example.rxjavestudydemo W/Settings: mValues not put! needsGenerationTracker: true currentGeneration: -1 name: enable_navbar value: null
    2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 4
    2020-06-02 16:45:04.646 10667-10667/com.example.rxjavestudydemo D/publishSubject: accept: publishSubject 5
    
    

    说明:

    1、订阅关系的发生,是按照代码的调用顺序来决定的;即根据调用的先后顺序来决定执行那些数据的发送;
    2、订阅关系发生后,不过有多少数据都会发送出去;
    3、Observable 一旦发生OnError()或者OnComplete() ,则只会传递相关的通知;

    4. ReplaySubject:

    ReplaySubject 会发射所有来自Observable的原始数据,不管什么时候订阅的观察者。但是也有一种特殊情况:当重放缓存增长到一定大小,或者是过了一段时间后,会丢弃原始数据。

    官方图

    简单使用:

    ReplaySubject<String> replaySubject = create();
            replaySubject.onNext("replaySubject 1");
            replaySubject.onNext("replaySubject 2");
            replaySubject.onNext("replaySubject 3");
            replaySubject.onNext("replaySubject 4");
            replaySubject.onNext("replaySubject 5");
            replaySubject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("replaySubject", "accept: " + s);
                }
            });
            replaySubject.onComplete();
    
    执行结果:
    2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 1
    2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 2
    2020-06-02 17:30:09.364 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 3
    2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 4
    2020-06-02 17:30:09.365 22360-22360/com.example.rxjavestudydemo D/replaySubject: accept: replaySubject 5
    

    注意 :

    如果ReplaySubject 是一个观察者,不要在多个线程中调用它的On系列的方法,有可能出现非顺序调用;

    相关文章

      网友评论

          本文标题:RxJava学习笔记之Subject

          本文链接:https://www.haomeiwen.com/subject/exnuzhtx.html