美文网首页RxJava
RxJava<第四篇>:Cold Observabl

RxJava<第四篇>:Cold Observabl

作者: NoBugException | 来源:发表于2019-03-13 15:19 被阅读2次
    (1)Cold Observable

    定义两个观察者:

        Consumer consumer1 = new Consumer<CountBean>() {
            @Override
            public void accept(CountBean countBean) throws Exception {
                System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
            }
        };
    
        Consumer consumer2 = new Consumer<CountBean>() {
            @Override
            public void accept(CountBean countBean) throws Exception {
                System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
            }
        };
    

    定义被观察者:

        Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
            @Override
            public void subscribe(ObservableEmitter<CountBean> emitter) {
                CountBean countBean = new CountBean();
                countBean.setThreadName(Thread.currentThread().getName());
                emitter.onNext(countBean);
            }
        }).observeOn(Schedulers.newThread());
    
        observable.subscribe(consumer1);
        observable.subscribe(consumer2);
    

    以上代码就是典型的Cold Observable

    图片.png

    由执行效果得知两次订阅CountBean的哈希值不同,即操作的CountBean对象不同。

    (2)Hot Observable

    定义两个观察者:

        Consumer consumer1 = new Consumer<CountBean>() {
            @Override
            public void accept(CountBean countBean) throws Exception {
                System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
            }
        };
    
        Consumer consumer2 = new Consumer<CountBean>() {
            @Override
            public void accept(CountBean countBean) throws Exception {
                System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
            }
        };
    

    定义被观察者:(与Cold Observable不同)

    • 使用publish关键字

        ConnectableObservable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
            @Override
            public void subscribe(ObservableEmitter<CountBean> emitter) {
                CountBean countBean = new CountBean();
                countBean.setThreadName(Thread.currentThread().getName());
                emitter.onNext(countBean);
      
            }
        }).observeOn(Schedulers.newThread()).publish();
      
        observable.connect();//必须添加
      
        observable.subscribe(consumer1);
        observable.subscribe(consumer2);
      
    • 使用publish和refCount关键字

        Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
            @Override
            public void subscribe(ObservableEmitter<CountBean> emitter) {
                CountBean countBean = new CountBean();
                countBean.setThreadName(Thread.currentThread().getName());
                emitter.onNext(countBean);
    
            }
        }).observeOn(Schedulers.newThread()).publish().refCount();
    
    
        observable.subscribe(consumer1);
        observable.subscribe(consumer2);
    

    比第一种方式简化了很多。
    refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

    • 使用share关键字

        Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
            @Override
            public void subscribe(ObservableEmitter<CountBean> emitter) {
                CountBean countBean = new CountBean();
                countBean.setThreadName(Thread.currentThread().getName());
                emitter.onNext(countBean);
      
            }
        }).observeOn(Schedulers.newThread()).share();
      
      
        observable.subscribe(consumer1);
        observable.subscribe(consumer2);
      

    看一下share的源码得知,其实share就是publish().refCount()。

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> share() {
        return publish().refCount();
    }
    
    • 使用Subject关键字

        Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
            @Override
            public void subscribe(ObservableEmitter<CountBean> emitter) {
                CountBean countBean = new CountBean();
                countBean.setThreadName(Thread.currentThread().getName());
                emitter.onNext(countBean);
      
            }
        }).observeOn(Schedulers.newThread());
      
        PublishSubject subject = PublishSubject.create();
      
        observable.subscribe(subject);
      
        subject.subscribe(consumer1);
        subject.subscribe(consumer2);
      

    PublishSubject的父类是Subject, 查看Subject的源码可知,Subject继承了Observable,同时又实现了Observer,也就是说,Subject同时兼备了观察者和被观察者的特性,了解这个特性之后再去理解以上代码就容易多了。

    以上说明了4种Hot Observable的写法,他们的执行效果都是

    图片.png

    即,不管订阅多少个观察者,操作的对象是一样的,并且它是线程安全的。

    相关文章

      网友评论

        本文标题:RxJava<第四篇>:Cold Observabl

        本文链接:https://www.haomeiwen.com/subject/rlhzpqtx.html