美文网首页
Rxjava2 可连接的Observable(Connectab

Rxjava2 可连接的Observable(Connectab

作者: JiangMing_JIM | 来源:发表于2019-12-30 22:18 被阅读0次

    简要:

    需求了解:

    Rxjava中的普通的 Observable 在观察者订阅的时候就会发射数据,但是有的时候我们想自己控制数据的发射,比如在有指定的观察者或者全部的观察者订阅后开始发射数据,这个时候我们就要要用到Rxjava中的可连接的Observable来完成这个需求。

    这一节主要介绍 ConnectableObservable 和它的子类以及它们的操作符:

    • ConnectableObservable: 一个可连接的Observable,在订阅后不发射数据,调用 connect() 方法后开始发射数据。
    • Observable.publish():将一个Observable转换为一个可连接的Observable 。
    • ConnectableObservable.connect():指示一个可连接的Observable开始发射数据。
    • Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
    • ConnectableObservable.refCount():让一个可连接的Observable表现得像一个普通的Observable。
    • Observable.share():可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,等价于Observable.publish().refCount()
    • Observable.replay():保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

    1. ConnectableObservable

    一个可连接的Observable(ConnectableObservable)与普通的Observable差不多。不同之处:可连接的Observable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始。用这种方法,你可以等部分或者所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

    img-ConnectableObservable
    注意: ConnectableObservable 的线程切换只能通过 replay 操作符来实现,普通 Observable 的 subscribeOn()observerOn() 在 ConnectableObservable 中不起作用。可以通过 replay 操作符的指定线程调度器的方式来进行线程的切换。

    Javadoc: ConnectableObservable

    2. Publish

    将普通的Observable转换为可连接的Observable(ConnectableObservable)。

    如果要使用可连接的Observable,可以使用Observable的 publish 操作符,来将相应转换为ConnectableObservable对象。

    有一个变体接受一个函数作为参数(publish(Function selector))。这个函数用原始Observable发射的数据作为参数,产生 一个新的数据作为 ConnectableObservable 给发射,替换原位置的数据项。实质是在签名的基础上添加一个 Map 操作。

    简单实例:

      // 1. publish()
      // 创建ConnectableObservable
      ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
              .publish();    // publish操作将Observable转化为一个可连接的Observable
    
        // 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
      // 接受原始Observable的数据,产生一个新的Observable,可以对这个Observable进行函数处理
      Observable<String> publish = Observable.range(1, 5)
              .publish(new Function<Observable<Integer>, ObservableSource<String>>() {
    
                  @Override
                  public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
                      System.out.println("--> apply(4): " + integerObservable.toString());
    
                      Observable<String> map = integerObservable.map(new Function<Integer, String>() {
    
                          @Override
                          public String apply(Integer integer) throws Exception {
                              return "[this is map value]: " + integer * integer;
                          }
                      });
                      return map;
                  }
              });
              
        publish.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(4): " + s);
            }
        });
    

    输出:

    --> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649
    --> accept(4): [this is map value]: 1
    --> accept(4): [this is map value]: 4
    --> accept(4): [this is map value]: 9
    --> accept(4): [this is map value]: 16
    --> accept(4): [this is map value]: 25
    

    Javadoc: Observable.publish()
    Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)

    3. Connect

    让一个可连接的Observable开始发射数据给订阅者。

    • 可连接的Observable (connectableObservable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了 Connect 操作符时才会开始。
    • RxJava中 connect 是 ConnectableObservable 接口的一个方法,使用 publish 操作符可以将一个普通的Observable转换为一个 ConnectableObservable 。
    • 调用 ConnectableObservable 的 connect 方法会让它后面的Observable开始给发射数据给订阅 者。connect 方法返回一个 Subscription 对象,可以调用它的 unsubscribe 方法让Observable停 止发射数据给观察者。
    • 即使没有任何订阅者订阅它,你也可以使用 connect 方法让一个Observable开始发射数据 (或者开始生成待发射的数据)。这样,你可以将一个"冷"的Observable变为"热"的。

    实例代码:

        // 1. publish()
        // 创建ConnectableObservable
        ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
                .publish();    // publish操作将Observable转化为一个可连接的Observable
    
        // 创建普通的Observable
        Observable<Integer> range = Observable.range(1, 5);
    
        // 1.1 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
        connectableObservable.subscribe(new Observer<Integer>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(1)");
            }
    
            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(1): " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1)");
            }
        });
    
        // 1.2 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
        connectableObservable.subscribe(new Observer<Integer>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }
    
            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(2): " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("--> onComplete(2)");
            }
        });
    
        // 1.3 普通Observable在被订阅时就会发射数据
        range.subscribe(new Observer<Integer>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(3)");
            }
    
            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(3): " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("--> onComplete(3)");
            }
        });
    
        System.out.println("----------------start connect------------------");
        // 可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始发射数据
        // connectableObservable.connect();
        
        // 可选参数Consumer,返回一个Disposable对象,可以获取订阅状态和取消当前的订阅
        connectableObservable.connect(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("--> connect accept: " + disposable.isDisposed());
                // disposable.dispose();
            }
        });
    

    输出:

    --> onSubscribe(1)
    --> onSubscribe(2)
    --> onSubscribe(3)
    --> onNext(3): 1
    --> onNext(3): 2
    --> onNext(3): 3
    --> onNext(3): 4
    --> onNext(3): 5
    --> onComplete(3)
    ----------------start connect------------------
    --> connect accept: false
    --> onNext(1): 1
    --> onNext(2): 1
    --> onNext(1): 2
    --> onNext(2): 2
    --> onNext(1): 3
    --> onNext(2): 3
    --> onNext(1): 4
    --> onNext(2): 4
    --> onNext(1): 5
    --> onNext(2): 5
    --> onComplete(1)
    --> onComplete(2)
    

    Javadoc: ConnectableObservable.connect()
    Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)

    4. RefCount

    RefCount 的作用是让一个可连接的Observable行为像普通的Observable。

    RefCount 操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable 时, RefCount 连接到下层的可连接Observable。 RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

    解析: refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。

    实例代码:

        /**
         * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
         *
         * 具有以下可选参数:
         * subscriberCount: 指定需要连接到上游的订阅者数量。注意:当订阅者满足此数量后才会处理
         * timeout:         所有订阅用户退订后断开连接前的等待时间
         * unit:            时间单位
         * scheduler:        断开连接之前要等待的目标调度器
         */
        Observable<Long> refCountObservable = Observable
                .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
                .publish()
                .refCount()
                .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
                .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程
            //  .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());
    
        // 第1个订阅者
        refCountObservable.subscribe(new Observer<Long>() {
            private  Disposable disposable;
            private  int buff = 0;
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("----> onSubscribe(1): ");
                disposable = d;
            }
    
            @Override
            public void onNext(Long value) {
                if (buff == 3) {
                    disposable.dispose();   // 解除当前的订阅
                    System.out.println("----> Subscribe(1) is dispose! ");
                } else {
                    System.out.println("--> onNext(1): " + value);
                }
                buff++;
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1): ");
            }
        });
    
        // 第2个订阅者
        refCountObservable.doOnSubscribe(new Consumer<Disposable>() {
    
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("----> onSubscribe(2): ");
                    }
                })
                .delaySubscription(2, TimeUnit.SECONDS)   // 延迟2秒后订阅
                .subscribe(new Consumer<Long>() {
    
                    @Override
                    public void accept(Long value) throws Exception {
                        System.out.println("--> accept(2): " + value);
                    }
                });
    
        System.in.read();
    

    输出:

    ----> onSubscribe(1): 
    --> onNext(1): 1
    --> onNext(1): 2
    --> onNext(1): 3
    ----> onSubscribe(2): 
    ----> Subscribe(1) is dispose! 
    --> accept(2): 4
    --> accept(2): 5
    

    Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)

    5. Share

    一个普通的Observable可以通过 publish 来将其转换为ConnectableObservable,然后可以调用其 refCount() 的方法将其转换为一个具有 ConnectableObservable 特性的Observable。

    其实Observable中还有一个操作方法,可以直接完成此步骤的操作,这就是 Observable.share() 操作符。

    可以来看一下share操作符的源码:

        public final Observable<T> share() {
            return publish().refCount();
        }
    

    通过源码可以知道,share() 方法可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,即Observable.publish().refCount() == Observable.share()

    实例代码:

        // share()
        // 通过share() 同时应用 publish 和 refCount 操作
        Observable<Long> share = Observable
                .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
          //    .publish().refCount()
                .share()  // 等价于上面的操作
                .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
                .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程
    
        // 1. 第一个订阅者
        share.subscribe(new Observer<Long>() {
            private  Disposable disposable;
            private  int buff = 0;
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("----> onSubscribe(1): ");
                disposable = d;
            }
    
            @Override
            public void onNext(Long value) {
                if (buff == 3) {
                    disposable.dispose();   // 解除当前的订阅
                    System.out.println("----> Subscribe(1) is dispose! ");
                } else {
                    System.out.println("--> onNext(1): " + value);
                }
                buff++;
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1): ");
            }
        });
    
        // 2. 第二个订阅者
        share.doOnSubscribe(new Consumer<Disposable>() {
    
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("----> onSubscribe(2): ");
                    }
                })
                .delaySubscription(1, TimeUnit.SECONDS)    // 延迟1秒后订阅
                .subscribe(new Consumer<Long>() {
    
                    @Override
                    public void accept(Long value) throws Exception {
                        System.out.println("--> accept(2): " + value);
                    }
                });
    
        System.in.read();
    

    输出:

    ----> onSubscribe(1): 
    --> onNext(1): 1
    --> onNext(1): 2
    --> onNext(1): 3
    ----> onSubscribe(2): 
    ----> Subscribe(1) is dispose! 
    --> accept(2): 4
    --> accept(2): 5
    

    Javadoc: Observable.share()

    6. Replay

    保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

    img-Replay

    如果在将一个Observable转换为可连接的Observable之前对它使用 Replay 操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,可以缓存发射过的数据,即使那些观察者在这 个Observable开始给其它观察者发射数据之后才订阅。

    注意: replay操作符生成的 connectableObservable ,如果没有对缓存进行限定,那么无论观察者何时去订阅,都可以收到 Observable 完整的数据序列项。

    replay 操作符最好根据实际情况限定缓存的大小,否则数据发射过快或者较多时会占用很高的内存。replay 操作符有可以接受不同参数的变体,有的可以指定 replay 的最大缓存数量或者指定缓存时间,还可以指定调度器。

    • replay不仅可以缓存Observable的所有数据序列,也可以进行限定缓存大小的操作。
    • 还有有一种 replay 返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是 replay 变换之后的数据项。

    实例代码:

        // 创建发射数据的Observable
        Observable<Long> observable = Observable
                .intervalRange(1,
                        10,
                        1,
                        500,
                        TimeUnit.MILLISECONDS,
                        Schedulers.newThread());
    
        /**
         * 1.1 replay(Scheduler scheduler)
         * 可选参数:scheduler, 指定线程调度器
         * 接受原始数据的所有数据
         */
    //  ConnectableObservable<Long> replay1 = observable.replay();
    
        /**
         * 1.2 replay(int bufferSize, Scheduler scheduler)
         * 可选参数:scheduler, 指定线程调度器
         * 只缓存 bufferSize 个最近的原始数据
         */
    //  ConnectableObservable<Long> replay1 = observable.replay(1); // 设置缓存大小为1, 从原数据中缓存最近的1个数据
    
        /**
         * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
         * 可选参数:scheduler, 指定线程调度器
         * 在订阅前指定的时间段内缓存 bufferSize 个数据, 注意计时开始是原始数据发射第1个数据项之后开始
         */
    //  ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);
    
        /**
         * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
         * 可选参数:scheduler, 指定线程调度器
         * 在订阅前指定的时间段内缓存数据, 注意计时开始是原始数据发射第1个数据项之后开始
         */
       ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);
    
        // 进行 connect 操作
        replay1.connect();
    
        // 第一个观察者
        replay1.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("----> onSubScribe(1-1)");
            }
        }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("--> accept(1-1): " + aLong);
            }
        });
    
        // 第二个观察者(延迟1秒后订阅)
        replay1.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("----> onSubScribe(1-2)");
            }
        }).delaySubscription(1, TimeUnit.SECONDS)
          .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("--> accept(1-2): " + aLong);
                }
          });
    
        // 第三个观察者(延迟2秒后订阅)
        replay1.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("----> onSubScribe(1-3)");
            }
        }).delaySubscription(2, TimeUnit.SECONDS)
           .subscribe(new Consumer<Long>() {
               @Override
               public void accept(Long aLong) throws Exception {
                   System.out.println("--> accept(1-3): " + aLong);
               }
           });
    
        System.in.read();
        System.out.println("----------------------------------------------------------");
        /**
         * 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
         * int bufferSize,                              可选参数: 指定从元数据序列数据的缓存大小
         * long time, TimeUnit unit,        可选参数: 指定缓存指定时间段的数据序列
         * Scheduler scheduler)                 可选参数: 指定线程调度器
         *
         * 接受一个变换函数 function 为参数,这个函数接受原始Observable发射的数据项为参数
         * 通过指定的函数处理后,返回一个处理后的Observable
         */
        Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
                // 对原始数据进行处理
                Observable<String> map = longObservable.map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        return aLong + "² = " + aLong * aLong;  // 将原始数据进行平方处理,并转换为字符串数据类型
                    }
                });
    
                return map;
            }
        }, 1, Schedulers.newThread());
    
        replayObservable.subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread());
    
        // 第一个观察者
        replayObservable.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("--> onSubScribe(2-1)");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(2-1): " + s);
            }
        });
    
        // 订阅第二个观察者 (延迟2秒后订阅)
        replayObservable.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("--> onSubScribe(2-2)");
            }
        }).delaySubscription(2, TimeUnit.SECONDS)
          .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("--> accept(2-2): " + s);
                }
           });
    
        System.in.read();
    

    输出:

    ----> onSubScribe(1-1)
    --> accept(1-1): 1
    --> accept(1-1): 2
    --> accept(1-1): 3
    ----> onSubScribe(1-2)
    --> accept(1-2): 2
    --> accept(1-2): 3
    --> accept(1-1): 4
    --> accept(1-2): 4
    --> accept(1-1): 5
    --> accept(1-2): 5
    ----> onSubScribe(1-3)
    --> accept(1-3): 4
    --> accept(1-3): 5
    --> accept(1-1): 6
    --> accept(1-2): 6
    --> accept(1-3): 6
    --> accept(1-1): 7
    --> accept(1-2): 7
    --> accept(1-3): 7
    --> accept(1-1): 8
    --> accept(1-2): 8
    --> accept(1-3): 8
    --> accept(1-1): 9
    --> accept(1-2): 9
    --> accept(1-3): 9
    --> accept(1-1): 10
    --> accept(1-2): 10
    --> accept(1-3): 10
    ----------------------------------------------------------
    --> onSubScribe(2-1)
    --> accept(2-1): 1² = 1
    --> accept(2-1): 2² = 4
    --> accept(2-1): 3² = 9
    --> accept(2-1): 4² = 16
    --> onSubScribe(2-2)
    --> accept(2-1): 5² = 25
    --> accept(2-2): 1² = 1
    --> accept(2-2): 2² = 4
    --> accept(2-1): 6² = 36
    --> accept(2-2): 3² = 9
    --> accept(2-1): 7² = 49
    --> accept(2-1): 8² = 64
    --> accept(2-2): 4² = 16
    --> accept(2-2): 5² = 25
    --> accept(2-1): 9² = 81
    --> accept(2-2): 6² = 36
    --> accept(2-1): 10² = 100
    --> accept(2-2): 7² = 49
    --> accept(2-2): 8² = 64
    --> accept(2-2): 9² = 81
    --> accept(2-2): 10² = 100
    

    Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
    Javadoc: Observable.replay(Function<Observable<T>,ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)

    小结

    Rxjava 的连接操作符主要的核心是 ConnectableObservable 这个可连接的Observable对象的概念。可连接的 Observable 在被订阅时并不会直接发射数据,只有在他的 connect() 方法被调用时才会发射数据。便于更好的对数据的发射行为的控制,同时也对数据有很好的操作能力,可以缓存数据,指定缓存大小,时间片段缓存等。

    提示:以上使用的Rxjava2版本: 2.2.12

    Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

    实例代码:

    相关文章

      网友评论

          本文标题:Rxjava2 可连接的Observable(Connectab

          本文链接:https://www.haomeiwen.com/subject/xnkvoctx.html