美文网首页RxJavaAndroid RxjavaRX实战
RxJava2 实战知识梳理(12) - 实战讲解 publis

RxJava2 实战知识梳理(12) - 实战讲解 publis

作者: 泽毛 | 来源:发表于2017-09-04 00:11 被阅读938次

    RxJava2 实战系列文章

    RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
    RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
    RxJava2 实战知识梳理(3) - 优化搜索联想功能
    RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
    RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
    RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
    RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
    RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
    RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
    RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
    RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
    RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
    RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
    RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
    RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用


    一、前言

    今天,我们来整理以下几个大家容易弄混的概念,并用实际例子来演示,可以从 RxSample 的第十二章中获取:

    • publish
    • reply
    • ConnectableObservable
    • connect
    • share
    • refCount
    • autoConnect

    对于以上这些概念,可以用一幅图来概括:


    从图中可以看出,这里面可以供使用者订阅的Observable可以分为四类,下面我们将逐一介绍这几种Observable的特点:
    • 第一类:Cold Observable,就是我们通过Observable.createObservable.interval等创建型操作符生成的Observable
    • 第二类:由Cold Observable经过publish()或者replay(int N)操作符转换成的ConnectableObservable
    • 第三类:由ConnectableObservable经过refCount(),或者由Cold Observable经过share()转换成的Observable
    • 第四类:由ConnectableObservable经过autoConnect(int N)转换成的Observable

    二、Cold Observable

    Cold Observable就是我们通过Observable.createObservable.interval等创建型操作符生成的Observable,它具有以下几个特点:

    • 当一个订阅者订阅Cold Observable时,Cold Observable会重新开始发射数据给该订阅者。
    • 当多个订阅者订阅到同一个Cold Observable,它们收到的数据是相互独立的。
    • 当一个订阅者取消订阅Cold Observable后,Cold Observable会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者。

    下面,我们演示一个例子,首先我们创建一个Cold Observable

        //直接订阅Cold Observable。
        private void createColdSource() {
            mConvertObservable = getSource();
        }
    
        private Observable<Integer> getSource() {
            return Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    try {
                        int i = 0;
                        while (true) {
                            Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
                            mSourceOut.add(i);
                            observableEmitter.onNext(i++);
                            updateMessage();
                            Thread.sleep(1000);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).subscribeOn(Schedulers.io());
        }
    

    在创建两个订阅者,它们可以随时订阅到Cold Observable或者取消对它的订阅:

        private void startSubscribe1() {
            if (mConvertObservable != null && mDisposable1 == null) {
                mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                        mSubscribe1In.add(integer);
                        updateMessage();
                    }
                });
            }
        }
    
        private void disposeSubscribe1() {
            if (mDisposable1 != null) {
                mDisposable1.dispose();
                mDisposable1 = null;
                mSubscribe1In.clear();
                updateMessage();
            }
        }
    
        private void startSubscribe2() {
            if (mConvertObservable != null && mDisposable2 == null) {
                mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                        mSubscribe2In.add(integer);
                        updateMessage();
                    }
                });
            }
        }
    
        private void disposeSubscribe2() {
            if (mDisposable2 != null) {
                mDisposable2.dispose();
                mDisposable2 = null;
                mSubscribe2In.clear();
                updateMessage();
            }
        }
    

    为了验证之前说到的几个特点,进入程序之后,我们会先创建该Cold Observable,之后进行一系列的操作,效果如下:


    在上面的图中,我们做了一下几步操作:
    • 第一步:启动应用,创建Cold Observable,这时候Cold Observable没有发送任何数据。
    • 第二步:Observer1订阅Observable,此时Cold Observable开始发送数据,Observer1也可以收到数据,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会开始发射数据给该订阅者
    • 第三步:Observer2订阅Observable,此时Observable2也可以收到数据,但是它和Observable1收到的数据是相互独立的,即 当多个订阅者订阅到同一个 Cold Observable ,它们收到的数据是相互独立的
    • 第四步:Observer1取消对Observable的订阅,这时候Observer1收不到数据,并且Observable也不会发射数据给它,但是仍然会发射数据给Observer2,即 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者
    • 第五步:Observer1重新订阅Observable,这时候Observable0开始发射数据给Observer1,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会重新开始发射数据给该订阅者

    三、由 Cold Observable 转换的 ConnectableObservable

    在了解完Cold Observable之后,我们再来看第二类的Observable,它的类型为ConnectableObservable,它是通过Cold Observable经过下面两种方式生成的:

    • .publish()
    • .reply(int N)

    如果使用.publish()创建,那么订阅者只能收到在订阅之后Cold Observable发出的数据,而如果使用reply(int N)创建,那么订阅者在订阅后可以收到Cold Observable在订阅之前发送的N个数据。

    我们先以publish()为例,介绍ConnectableObservable的几个特点:

    • 无论ConnectableObservable有没有订阅者,只要调用了ConnectableObservableconnect方法,Cold Observable就开始发送数据。
    • connect会返回一个Disposable对象,调用了该对象的dispose方法,Cold Observable将会停止发送数据,所有ConnectableObservable的订阅者也无法收到数据。
    • 在调用connect返回的Disposable对象后,如果重新调用了connect方法,那么Cold Observable会重新发送数据。
    • 当一个订阅者订阅到ConnectableObservable后,该订阅者会收到在订阅之后,Cold Observable发送给ConnectableObservable的数据。
    • 当多个订阅者订阅到同一个ConnectableObservable时,它们收到的数据是相同的。
    • 当一个订阅者取消对ConnectableObservable,不会影响其他订阅者收到消息。

    下面,我们创建一个ConnectableObservable,两个订阅者之后会订阅到它,而不是Cold Observable

        //.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,无论此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将可以收到数据,并且订阅者解除订阅不会影响源Observable数据的发射。
        public void createPublishSource() {
            mColdObservable = getSource();
            mConvertObservable = mColdObservable.publish();
            mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
        }
    

    和上面一样,还是用一个例子来演示,该例子的效果为:


    • 第一步:启动应用,通过Cold Observablepublish方法创建ConnectableObservable,并调用ConnectableObservableconnect方法,可以看到,此时虽然ConnectableObservable没有任何订阅者,但是Cold Observable也已经开始发送数据。
    • 第二步:Observer1订阅到ConnectableObservable,此时它只能收到订阅之后Cold Observable发射的数据。
    • 第三步:Observer2订阅到ConnectableObservableCold Observable只会发射一份数据,并且Observer1Observer2收到的数据是相同的。
    • 第三步:Observer1取消对ConnectableObservable的订阅,Cold Observable仍然会发射数据,Observer2仍然可以收到Cold Observable发射的数据。
    • 第四步:Observer1重新订阅ConnectableObservable,和第三步相同,它仍然只会收到订阅之后Cold Observable发射的数据。
    • 第五步:通过connect返回的Disposable对象,调用dispose方法,此时Cold Observable停止发射数据,并且Observer1Observer2都收不到数据。

    上面这些现象发生的根本原因在于:现在ObserverObserver2都是订阅到ConnectableObservable,真正产生数据的Cold Observable并不知道他们的存在,和它交互的是ConnectableObservableConnectableObservable相当于一个中介,它完成下面两项任务:

    • 对于上游:通过connectdispose方法决定是否要订阅到Cold Observer,也就是决定了Cold Observable是否发送数据。
    • 对于下游:将Cold Observable发送的数据转交给它的订阅者。

    四、由 ConnectableObservable 转换成 Observable

    ConnectableObservable转换成Observable有两种方法,我们分为两节介绍下当订阅到转换后的Observable时的现象:

    • .refCount()
    • .autoConnect(int N)

    4.1 ConnectableObservable 由 refCount 转换成 Observable

    经过refCount方法,ConnectableObservable可以转换成正常的Observable,我们称为refObservable,这里我们假设ConnectableObservable是由Cold Observable通过publish()方法转换的,对于它的订阅者,有以下几个特点:

    • 第一个订阅者订阅到refObservable后,Cold Observable开始发送数据。
    • 之后的订阅者订阅到refObservable后,只能收到在订阅之后Cold Observable发送的数据。
    • 如果一个订阅者取消订阅到refObservable后,假如它是当前refObservable的唯一一个订阅者,那么Cold Observable会停止发送数据;否则,Cold Observable仍然会继续发送数据,其它的订阅者仍然可以收到Cold Observable发送的数据。

    接着上例子,我们创建一个refObservable

        //.share()相当于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,如果所有的订阅者都取消订阅,源Observable就会停止发送数据。
        private void createShareSource() {
            mColdObservable = getSource();
            mConvertObservable = mColdObservable.publish().refCount();
        }
    

    示例如下:



    操作分为以下几步:

    • 第一步:通过.publish().refCount()创建由ConnectableObservable转换后的refObservable,此时Cold Observable没有发送任何消息。
    • 第二步:Observer1订阅到refObservableCold Observable开始发送数据,Observer1接收数据。
    • 第三步:Observer2订阅到refObservable,它只能收到在订阅之后Cold Observable发送的数据。
    • 第四步:Observer1取消订阅,Cold Observable继续发送数据,Observer2仍然能收到数据。
    • 第五步:Observer2取消订阅,Cold Observable停止发送数据。
    • 第六步:Observer1重新订阅,Cold Observable重新开始发送数据。

    最后说明一点:订阅到Cold Observable.publish().refCount()Cold Observableshare()所返回的Observable是等价的。

    4.2 ConnectableObservable 由 autoConnect(int N) 转换成 Observable

    autoConnect(int N)refCount很类似,都是将ConnectableObservable转换成普通的Observable,我们称为autoObservable,同样我们先假设ConnectableObservable是由Cold Observable通过publish()方法生成的,它有以下几个特点:

    • 当有N个订阅者订阅到refObservable后,Cold Observable开始发送数据。
    • 之后的订阅者订阅到refObservable后,只能收到在订阅之后Cold Observable发送的数据。
    • 只要Cold Observable开始发送数据,即使所有的autoObservable的订阅和都取消了订阅,Cold Observable也不会停止发送数据,如果想要Cold Observable停止发送数据,那么可以使用autoConnect(int numberOfSubscribers, Consumer connection)Consumer返回的Disposable,它的作用和ConnectableObservableconnect方法返回的Disposable相同。

    其创建方法如下所示:

        //.autoConnect在有指定个订阅者时开始让源Observable发送消息,但是订阅者是否取消订阅不会影响到源Observable的发射。
        private void createAutoConnectSource() {
            mColdObservable = getSource();
            mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    mConvertDisposable = disposable;
                }
            });
        }
    

    示例效果如下:



    我们进行了如下几步操作:

    • 第一步:启动应用,创建autoConnect转换后的autoObservable
    • 第二步:Observer1订阅到autoObservable,此时满足条件,Cold Observable开始发送数据。
    • 第三步:Observer2订阅到autoObservable,它只能收到订阅后发生的数据。
    • 第四步:Observer1取消订阅,Cold Observable继续发送数据,Observer2仍然可以收到数据。
    • 第五步:Observer2取消订阅,Cold Observable仍然继续发送数据。
    • 第六步:Observer2订阅到autoObservable,它只能收到订阅后发送的消息了。
    • 第七步:调用mConvertDisposabledisposeCold Observable停止发送数据。

    五、publish 和 reply(int N) 的区别

    在上面的例子当中,所有总结的特点都是建立在ConnectableObservable是由publish()生成,只所以这么做,是为了方便大家理解,无论是订阅到ConnectableObservable,还是由ConnectableObservable转换的refObservableautoObservable,使用这两种方式创建的唯一区别就是,订阅者在订阅后,如果是通过publish()创建的,那么订阅者之后收到订阅后Cold Observable发送的数据;而如果是reply(int N)创建的,那么订阅者还能额外收到N个之前Cold Observable发送的数据,我们用下面一个小例子来演示,订阅者订阅到的Observable如下:

        //.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
        private void createReplySource() {
            mColdObservable = getSource();
            mConvertObservable = mColdObservable.replay(3);
            mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
        }
    

    示例演示效果:



    操作步骤:

    • 第一步:启动应用,通过Cold Observablereplay(3)方法创建ConnectableObservable,可以看到,此时虽然ConnectableObservable没有任何订阅者,但是Cold Observable也已经开始发送数据。
    • 第二步:Observer1订阅到ConnectableObservable,此时它会先收到之前发射的3个数据,之后收到订阅之后Cold Observable发射的数据。

    最后再提一下,更详细的代码大家可以从 RxSample 的第十二章中获取。


    更多文章,欢迎访问我的 Android 知识梳理系列:

    相关文章

      网友评论

      • 那bi很美:这么好的系列文章,竟然这么少人看,太可惜
        泽毛:@那bi很美 哈哈,还好拉,多谢支持!:smile:
      • yao_94:写的很好,不过最后一个操作步骤的第一步应该写错了吧,应该是Cold Observable的replay(3)方法创建ConnectableObservable
        泽毛::+1: 谢谢提醒,已经修正。
      • 牙Sir:怎么把replay都写成了reply。。。不过写得不错
        泽毛:@Scott_Wang 哈哈,失误😂
      • CCY0122:强强强!!解释的很清楚
        泽毛: @CCY0122 😃谢谢支持

      本文标题:RxJava2 实战知识梳理(12) - 实战讲解 publis

      本文链接:https://www.haomeiwen.com/subject/sozjjxtx.html