美文网首页
RxJava系列之Publish,Share,Refcount操

RxJava系列之Publish,Share,Refcount操

作者: 代码改变人生 | 来源:发表于2019-08-04 15:57 被阅读0次

    1. Observable的分类——Cold 和 Hot

    • Hot Observable无论有没有Subscriber订阅,事件始终都会发射。当Hot Observable有多个订阅者时,Hot Observable与订阅者们是一对多的关系,即可以与多个订阅者共享信息。
    • Cold Observable只有 Subscriber订阅时,才开始发射数据流, Cold Observable与订阅者只能是一对一的关系,即当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个Subscriber的时候,他们各自的事件是独立的。

    2. Cold Observable 如何转换成 Hot Observable

    (1) publish操作符
    • publish将普通的Observable转换为可连接的Observable


      publish.png

    其实可连接的Observable类似于普通的Observable,区别在于它在订阅时才发射数据,只有当使用Connect操作符才开始。 通过这种方式,可以选择Observable的发射时间。

    使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable。它将原先的 Observable 转换成 ConnectableObservable。

            Consumer<Long> subscriber1 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("subscriber1: "+aLong);
                }
            };
    
            Consumer<Long> subscriber2 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("   subscriber2: "+aLong);
                }
            };
    
            ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(5, TimeUnit.MILLISECONDS,Schedulers.computation())
                            .take(Integer.MAX_VALUE)
                            .subscribe(e::onNext);
                }
            }).observeOn(Schedulers.newThread()).publish();
            //注意生成的 ConnectableObservable 需要调用connect()才能真正执行。
            observable.connect();
            observable.subscribe(subscriber1);
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            observable.subscribe(subscriber2);
    
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    执行结果如下:

    subscriber1: 0
    subscriber1: 1
    subscriber1: 2
       subscriber2: 2
    subscriber1: 3
       subscriber2: 3
    subscriber1: 4
       subscriber2: 4
    subscriber1: 5
       subscriber2: 5
    subscriber1: 6
       subscriber2: 6
    

    3. Hot Observable 如何转换成 Cold Observable

    (1) refCount操作符

    该操作符可以使Connectable Observable的行为类似于普通的Observable


    refcount.png

    RefCount操作符把从一个可连接的 Observable 连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

    如果所有的订阅者都取消订阅了,则数据流停止。如果重新订阅则重新开始数据流。

            Consumer<Long> subscriber1 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("subscriber1: "+aLong);
                }
            };
    
            Consumer<Long> subscriber2 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("   subscriber2: "+aLong);
                }
            };
    
            ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                            .take(Integer.MAX_VALUE)
                            .subscribe(e::onNext);
                }
            }).observeOn(Schedulers.newThread()).publish();
            connectableObservable.connect();
    
            Observable<Long> observable = connectableObservable.refCount();
    
            Disposable disposable1 = observable.subscribe(subscriber1);
            Disposable disposable2 = observable.subscribe(subscriber2);
    
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            disposable1.dispose();
            disposable2.dispose();
    
            System.out.println("重新开始数据流");
    
            disposable1 = observable.subscribe(subscriber1);
            disposable2 = observable.subscribe(subscriber2);
    
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    执行结果:

    subscriber1: 0
       subscriber2: 0
    subscriber1: 1
       subscriber2: 1
    重新开始数据流
    subscriber1: 0
       subscriber2: 0
    subscriber1: 1
       subscriber2: 1
    
    注:如果不是所有的订阅者都取消了订阅,只取消了部分。部分的订阅者重新开始订阅,则不会从头开始数据流。
            Consumer<Long> subscriber1 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("subscriber1: "+aLong);
                }
            };
    
            Consumer<Long> subscriber2 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("   subscriber2: "+aLong);
                }
            };
    
            Consumer<Long> subscriber3 = new Consumer<Long>() {
                @Override
                public void accept(@NonNull Long aLong) throws Exception {
                    System.out.println("      subscriber3: "+aLong);
                }
            };
    
            ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                            .take(Integer.MAX_VALUE)
                            .subscribe(e::onNext);
                }
            }).observeOn(Schedulers.newThread()).publish();
            connectableObservable.connect();
    
            Observable<Long> observable = connectableObservable.refCount();
    
            Disposable disposable1 = observable.subscribe(subscriber1);
            Disposable disposable2 = observable.subscribe(subscriber2);
            observable.subscribe(subscriber3);
    
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            disposable1.dispose();
            disposable2.dispose();
    
            System.out.println("subscriber1、subscriber2 重新订阅");
    
            disposable1 = observable.subscribe(subscriber1);
            disposable2 = observable.subscribe(subscriber2);
    
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    执行结果:

    subscriber1: 0
       subscriber2: 0
          subscriber3: 0
    subscriber1: 1
       subscriber2: 1
          subscriber3: 1
    subscriber1、subscriber2 重新订阅
          subscriber3: 2
    subscriber1: 2
       subscriber2: 2
          subscriber3: 3
    subscriber1: 3
       subscriber2: 3
          subscriber3: 4
    subscriber1: 4
       subscriber2: 4
    
    share操作符

    share操作符封装了publish().refCount()调用,可以看其源码。


    share.png

    相关文章

      网友评论

          本文标题:RxJava系列之Publish,Share,Refcount操

          本文链接:https://www.haomeiwen.com/subject/pxhydctx.html