美文网首页RxJava
RxJava<第四篇>:Cold Observabl

RxJava<第四篇>:Cold Observabl

作者: NoBugException | 来源:发表于2019-03-13 15:19 被阅读2次
(1)Cold Observable

定义两个观察者:

    Consumer consumer1 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };
    Consumer consumer2 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };

定义被观察者:

    Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
        @Override
        public void subscribe(ObservableEmitter<CountBean> emitter) {
            CountBean countBean = new CountBean();
            countBean.setThreadName(Thread.currentThread().getName());
            emitter.onNext(countBean);
        }
    }).observeOn(Schedulers.newThread());

    observable.subscribe(consumer1);
    observable.subscribe(consumer2);

以上代码就是典型的Cold Observable

图片.png

由执行效果得知两次订阅CountBean的哈希值不同,即操作的CountBean对象不同。

(2)Hot Observable

定义两个观察者:

    Consumer consumer1 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };
    Consumer consumer2 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };

定义被观察者:(与Cold Observable不同)

  • 使用publish关键字

      ConnectableObservable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
          @Override
          public void subscribe(ObservableEmitter<CountBean> emitter) {
              CountBean countBean = new CountBean();
              countBean.setThreadName(Thread.currentThread().getName());
              emitter.onNext(countBean);
    
          }
      }).observeOn(Schedulers.newThread()).publish();
    
      observable.connect();//必须添加
    
      observable.subscribe(consumer1);
      observable.subscribe(consumer2);
    
  • 使用publish和refCount关键字

    Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
        @Override
        public void subscribe(ObservableEmitter<CountBean> emitter) {
            CountBean countBean = new CountBean();
            countBean.setThreadName(Thread.currentThread().getName());
            emitter.onNext(countBean);

        }
    }).observeOn(Schedulers.newThread()).publish().refCount();


    observable.subscribe(consumer1);
    observable.subscribe(consumer2);

比第一种方式简化了很多。
refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

  • 使用share关键字

      Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
          @Override
          public void subscribe(ObservableEmitter<CountBean> emitter) {
              CountBean countBean = new CountBean();
              countBean.setThreadName(Thread.currentThread().getName());
              emitter.onNext(countBean);
    
          }
      }).observeOn(Schedulers.newThread()).share();
    
    
      observable.subscribe(consumer1);
      observable.subscribe(consumer2);
    

看一下share的源码得知,其实share就是publish().refCount()。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> share() {
    return publish().refCount();
}
  • 使用Subject关键字

      Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
          @Override
          public void subscribe(ObservableEmitter<CountBean> emitter) {
              CountBean countBean = new CountBean();
              countBean.setThreadName(Thread.currentThread().getName());
              emitter.onNext(countBean);
    
          }
      }).observeOn(Schedulers.newThread());
    
      PublishSubject subject = PublishSubject.create();
    
      observable.subscribe(subject);
    
      subject.subscribe(consumer1);
      subject.subscribe(consumer2);
    

PublishSubject的父类是Subject, 查看Subject的源码可知,Subject继承了Observable,同时又实现了Observer,也就是说,Subject同时兼备了观察者和被观察者的特性,了解这个特性之后再去理解以上代码就容易多了。

以上说明了4种Hot Observable的写法,他们的执行效果都是

图片.png

即,不管订阅多少个观察者,操作的对象是一样的,并且它是线程安全的。

相关文章

网友评论

    本文标题:RxJava<第四篇>:Cold Observabl

    本文链接:https://www.haomeiwen.com/subject/rlhzpqtx.html