RxJava 的 Subject

作者: fengzhizi715 | 来源:发表于2017-06-05 19:44 被阅读1815次
    streams everywhere.png

    Subject 是一种特殊的存在

    在前面一篇文章Cold Observable 和 Hot Observable中,曾经介绍过 Subject 既是 Observable 又是 Observer(Subscriber)。官网称 Subject 可以看成是一个桥梁或者代理。

    Subject的分类

    Subject包含四种类型分别是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。

    1. AsyncSubject

    Observer会接收AsyncSubject的onComplete()之前的最后一个数据。

    AsyncSubject<String> subject = AsyncSubject.create();
            subject.onNext("asyncSubject1");
            subject.onNext("asyncSubject2");
            subject.onComplete();
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("asyncSubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("asyncSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("asyncSubject:complete");  //输出 asyncSubject onComplete
                }
            });
    
            subject.onNext("asyncSubject3");
            subject.onNext("asyncSubject4");
    

    执行结果:

    asyncSubject:asyncSubject2
    asyncSubject:complete
    

    改一下代码,将subject.onComplete()放在最后。

            AsyncSubject<String> subject = AsyncSubject.create();
            subject.onNext("asyncSubject1");
            subject.onNext("asyncSubject2");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("asyncSubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("asyncSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("asyncSubject:complete");  //输出 asyncSubject onComplete
                }
            });
    
            subject.onNext("asyncSubject3");
            subject.onNext("asyncSubject4");
            subject.onComplete();
    

    执行结果:

    asyncSubject:asyncSubject4
    asyncSubject:complete
    

    注意,subject.onComplete()必须要调用才会开始发送数据,否则Subscriber将不接收任何数据。

    2. BehaviorSubject

    Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。

            BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("behaviorSubject:"+s); 
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("behaviorSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("behaviorSubject:complete");  //输出 behaviorSubject onComplete
                }
            });
    
            subject.onNext("behaviorSubject2");
            subject.onNext("behaviorSubject3");
    

    执行结果:

    behaviorSubject:behaviorSubject1
    behaviorSubject:behaviorSubject2
    behaviorSubject:behaviorSubject3
    

    在这里,behaviorSubject1是默认值。因为执行了

    BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
    

    稍微改一下代码,在subscribe()之前,再发射一个事件。

            BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
            subject.onNext("behaviorSubject2");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("behaviorSubject:"+s);  //输出asyncSubject:asyncSubject3
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("behaviorSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("behaviorSubject:complete");  //输出 behaviorSubject onComplete
                }
            });
    
            subject.onNext("behaviorSubject3");
            subject.onNext("behaviorSubject4");
    

    执行结果:

    behaviorSubject:behaviorSubject2
    behaviorSubject:behaviorSubject3
    behaviorSubject:behaviorSubject4
    

    这次丢弃了默认值,而发射behaviorSubject2。
    因为BehaviorSubject 每次只会发射调用subscribe()方法之前的最后一个事件和调用subscribe()方法之后的事件。

    BehaviorSubject还可以缓存最近一次发出信息的数据。

    3. ReplaySubject

    ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

            ReplaySubject<String> subject = ReplaySubject.create();
            subject.onNext("replaySubject1");
            subject.onNext("replaySubject2");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("replaySubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("replaySubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("replaySubject:complete");  //输出 replaySubject onComplete
                }
            });
    
            subject.onNext("replaySubject3");
            subject.onNext("replaySubject4");
    

    执行结果:

    replaySubject:replaySubject1
    replaySubject:replaySubject2
    replaySubject:replaySubject3
    replaySubject:replaySubject4
    

    稍微改一下代码,将create()改成createWithSize(1)只缓存订阅前最后发送的1条数据

            ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
            subject.onNext("replaySubject1");
            subject.onNext("replaySubject2");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("replaySubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("replaySubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("replaySubject:complete");  //输出 replaySubject onComplete
                }
            });
    
            subject.onNext("replaySubject3");
            subject.onNext("replaySubject4");
    

    执行结果:

    replaySubject:replaySubject2
    replaySubject:replaySubject3
    replaySubject:replaySubject4
    

    这个执行结果跟BehaviorSubject是一样的。但是从并发的角度来看,ReplaySubject 在处理并发 subscribe() 和 onNext() 时会更加复杂。

    ReplaySubject除了可以限制缓存数据的数量和还能限制缓存的时间。使用createWithTime()即可。

    4. PublishSubject

    Observer只接收PublishSubject被订阅之后发送的数据。

            PublishSubject<String> subject = PublishSubject.create();
            subject.onNext("publicSubject1");
            subject.onNext("publicSubject2");
            subject.onComplete();
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("publicSubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("publicSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("publicSubject:complete");  //输出 publicSubject onComplete
                }
            });
    
            subject.onNext("publicSubject3");
            subject.onNext("publicSubject4");
    

    执行结果:

    publicSubject:complete
    

    因为subject在订阅之前,已经执行了onComplete()方法,所以无法发射数据。稍微改一下代码,将onComplete()方法放在最后。

            PublishSubject<String> subject = PublishSubject.create();
            subject.onNext("publicSubject1");
            subject.onNext("publicSubject2");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println("publicSubject:"+s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("publicSubject onError");  //不输出(异常才会输出)
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("publicSubject:complete");  //输出 publicSubject onComplete
                }
            });
    
            subject.onNext("publicSubject3");
            subject.onNext("publicSubject4");
            subject.onComplete();
    

    执行结果:

    publicSubject:publicSubject3
    publicSubject:publicSubject4
    publicSubject:complete
    

    最后,一句话总结一下四个Subject的特性。

    Subject 发射行为
    AsyncSubject 不论订阅发生在什么时候,只会发射最后一个数据
    BehaviorSubject 发送订阅之前一个数据和订阅之后的全部数据
    ReplaySubject 不论订阅发生在什么时候,都发射全部数据
    PublishSubject 发送订阅之后全部数据

    可能错过的事件

    Subject 作为一个Observable时,可以不停地调用onNext()来发送事件,直到遇到onComplete()才会结束。

    PublishSubject<String> subject = PublishSubject.create();
            subject.subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            System.out.println(s);
                        }
                    }, new Consumer<Throwable>() {
    
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
    
                        }
                    },new Action() {
                        @Override
                        public void run() throws Exception {
                            System.out.println("completed");
                        }
                    });
            subject.onNext("Foo");
            subject.onNext("Bar");
            subject.onComplete();
    

    执行的结果:

    Foo
    Bar
    completed
    

    如果,使用 subsribeOn 操作符将 subject 切换到IO线程,再使用 Thread.sleep(2000) 让主线程休眠2秒。

     PublishSubject<String> subject = PublishSubject.create();
            subject.subscribeOn(Schedulers.io())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            System.out.println(s);
                        }
                    }, new Consumer<Throwable>() {
    
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
    
                        }
                    },new Action() {
                        @Override
                        public void run() throws Exception {
                            System.out.println("completed");
                        }
                    });
            subject.onNext("Foo");
            subject.onNext("Bar");
            subject.onComplete();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    这时,其执行的结果变为:

    completed
    

    为何会缺少打印Foo和Bar?

    因为,subject 发射元素的线程被指派到了 IO 线程,此时 IO 线程正在初始化还没起来,subject 发射前这两个元素Foo、Bar还在主线程中,主线程的这两个元素往 IO 线程转发的过程中由于 IO 线程还没有起来,所以就被丢弃了。此时,无论Thread睡了多少秒,Foo、Bar都不会被打印出来。

    其实,解决办法也很简单,将subject改成使用Observable.create()来替代,它允许为每个订阅者精确控制事件的发送,这样就不会缺少打印Foo和Bar。

    使用PublishSubject来实现简化的RxBus

    下面的代码是一个简化版本的Event Bus,在这里使用了PublishSubject。因为事件总线是基于发布/订阅模式实现的,如果某一事件在多个Activity/Fragment中被订阅的话,在App的任意地方一旦发布该事件,则多个订阅的地方都能够同时收到这一事件(在这里,订阅事件的Activity/Fragment不能被destory,一旦被destory就不能收到事件),这很符合Hot Observable的特性。所以,我们使用PublishSubject,考虑到多线程的情况,还需要使用 Subject 的 toSerialized() 方法。

    import io.reactivex.Observable;
    import io.reactivex.subjects.PublishSubject;
    import io.reactivex.subjects.Subject;
    
    public class RxBus {
    
        private final Subject<Object> mBus;
    
        private RxBus() {
            mBus = PublishSubject.create().toSerialized();
        }
    
        public static RxBus get() {
            return Holder.BUS;
        }
    
        public void post(Object obj) {
            mBus.onNext(obj);
        }
    
        public <T> Observable<T> toObservable(Class<T> tClass) {
            return mBus.ofType(tClass);
        }
    
        public Observable<Object> toObservable() {
            return mBus;
        }
    
        public boolean hasObservers() {
            return mBus.hasObservers();
        }
    
        private static class Holder {
            private static final RxBus BUS = new RxBus();
        }
    }
    

    在这里Subject的toSerialized(),使用SerializedSubject包装了原先的Subject。

        /**
         * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
         * onComplete methods, making them thread-safe.
         * <p>The method is thread-safe.
         * @return the wrapped and serialized subject
         */
        @NonNull
        public final Subject<T> toSerialized() {
            if (this instanceof SerializedSubject) {
                return this;
            }
            return new SerializedSubject<T>(this);
        }
    

    这个版本的Event Bus比较简单,并没有考虑到背压的情况,因为在 RxJava2.x 中 Subject 已经不再支持背压了。如果要增加背压的处理,可以使用Processor,我们需要将 PublishSubject 改成 PublishProcessor,对应的 Observable 也需要改成 Flowable。

    使用BehaviorSubject来实现预加载

    预加载可以很好的提高程序的用户体验。
    每当用户处于弱网络时,打开一个App可能出现一片空白或者一直在loading,那用户一定会很烦躁。此时,如果能够预先加载一些数据,例如上一次打开App时保存的数据,这样不至于会损伤App的用户体验。

    下面是借助 BehaviorSubject 的特性来实现一个简单的预加载类RxPreLoader。

    import io.reactivex.disposables.Disposable;
    import io.reactivex.functions.Consumer;
    import io.reactivex.subjects.BehaviorSubject;
    
    /**
     * Created by Tony Shen on 2017/6/2.
     */
    
    public class RxPreLoader<T> {
    
        //能够缓存订阅之前的最新数据
        private  BehaviorSubject<T> mData;
        private Disposable disposable;
    
        public RxPreLoader(T defaultValue) {
    
            mData = BehaviorSubject.createDefault(defaultValue);
        }
    
        /**
         * 发送事件
         * @param object
         */
        public void publish(T object) {
            mData.onNext(object);
        }
    
        /**
         * 订阅事件
         * @param onNext
         * @return
         */
        public  Disposable subscribe(Consumer onNext) {
            disposable = mData.subscribe(onNext);
            return disposable;
        }
    
        /**
         * 反订阅
         *
         */
        public void dispose() {
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
                disposable = null;
            }
        }
    
        /**
         * 获取缓存数据的Subject
         *
         * @return
         */
        public BehaviorSubject<T> getCacheDataSubject() {
            return mData;
        }
    
        /**
         * 直接获取最近的一个数据
         *
         * @return
         */
        public T getLastCacheData() {
            return mData.getValue();
        }
    }
    

    可以考虑在基类的Activity/Fragment中也实现一个类似的RxPreLoader。

    总结

    RxJava 的 Subject 是一种特殊的存在,它的灵活性在使用时也会伴随着风险,没有用好它的话会错过事件,并且使用时还要小心 Subject 不是线程安全的。当然很多开源框架都在使用Subject,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject。

    相关文章

      网友评论

      本文标题:RxJava 的 Subject

      本文链接:https://www.haomeiwen.com/subject/qjzkfxtx.html