美文网首页Android-RxJavaAndroid-Rxjava&retrofit&daggerAndroid开发
这可能是最好的RxJava 2.x 入门教程(二)

这可能是最好的RxJava 2.x 入门教程(二)

作者: nanchen2251 | 来源:发表于2017-06-22 17:28 被阅读27329次

这可能是最好的 RxJava 2.x 入门教程系列专栏
文章链接:
这可能是最好的 RxJava 2.x 入门教程(完结版)[推荐直接看这个]
这可能是最好的RxJava 2.x 入门教程(一)
这可能是最好的RxJava 2.x 入门教程(二)
这可能是最好的RxJava 2.x 入门教程(三)
这可能是最好的RxJava 2.x 入门教程(四)
这可能是最好的RxJava 2.x 入门教程(五)
GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

前言

很快我们就迎来了第二期,上一期我们主要讲解了 RxJava 1.x 到 2.x 的变化概览,相信各位熟练掌握RxJava 1.x的老司机们随便看一下变化概览就可以上手RxJava 2.x了,但为了满足更广大的年轻一代司机(未来也是老司机),在本节中,我们将学习RxJava 2.x 强大的操作符章节。
【注】以下所有操作符标题都可直接点击进入官方doc查看。

正题

Create

create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                mRxOperatorsText.append("Observable emit 1" + "\n");
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                mRxOperatorsText.append("Observable emit 2" + "\n");
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                mRxOperatorsText.append("Observable emit 3" + "\n");
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                mRxOperatorsText.append("Observable emit 4" + "\n");
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n");
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("onNext : value : " + integer + "\n");
                Log.e(TAG, "onNext : value : " + integer + "\n" );
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                    mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n");
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("onComplete" + "\n");
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

输出:


需要注意的几点是:

  • 在发射事件中,我们在发射了数值 3 之后,直接调用了 e.onComlete(),虽然无法接收事件,但发送事件还是继续的。

  • 另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion,意味着我们做一些特定操作再也不用 try-catch 了。

  • 并且 2.x 中有一个 Disposable 概念,这个东西可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

Map

Map 基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

输出:


是的,map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从Log日志显而易见。

Zip

zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("zip : accept : " + s + "\n");
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });
private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    mRxOperatorsText.append("String emit : A \n");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    mRxOperatorsText.append("String emit : B \n");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    mRxOperatorsText.append("String emit : C \n");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });
    }

    private Observable<Integer> getIntegerObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    mRxOperatorsText.append("Integer emit : 1 \n");
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    mRxOperatorsText.append("Integer emit : 2 \n");
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    mRxOperatorsText.append("Integer emit : 3 \n");
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    mRxOperatorsText.append("Integer emit : 4 \n");
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    mRxOperatorsText.append("Integer emit : 5 \n");
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
    }

输出:


需要注意的是:

  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。

  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老的单身狗。

Concat

对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("concat : "+ integer + "\n");
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });

输出:


如图,可以看到。发射器 B 把自己的三个孩子送给了发射器 A,让他们组合成了一个新的发射器,非常懂事的孩子,有条不紊的排序接收。

FlatMap

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap

Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
               e.onNext(1);
               e.onNext(2);
               e.onNext(3);
           }
       }).flatMap(new Function<Integer, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
               List<String> list = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   list.add("I am value " + integer);
               }
               int delayTime = (int) (1 + Math.random() * 10);
               return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
           }
       }).subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(@NonNull String s) throws Exception {
                       Log.e(TAG, "flatMap : accept : " + s + "\n");
                       mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                   }
               });

输出:


一切都如我们预期中的有意思,为了区分 concatMap(下一个会讲),我在代码中特意动了一点小手脚,我采用一个随机数,生成一个时间,然后通过 delay(后面会讲)操作符,做一个小延时操作,而查看 Log 日志也确认验证了我们上面的说法,它是无序的。

concatMap

上面其实就说了,concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + s + "\n");
                        mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                    }
                });

输出:


结果的确和我们预想的一样。

写在最后

好了,这一节就先介绍到这里,下一节我们将学习其它的一些操作符,在操作符讲完后再带大家进入实际情景,希望持续关注,代码传送门

做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~


nanchen

相关文章

网友评论

  • ikiler:没有任何讲解,单纯的贴代码,写log,可能接触过rxjava的好理解,对于从没接触过rxjava的人很伤啊,很多的类,方法,变量根本不知道是什么,也不知道干什么的哎·
    雾野客樵:我觉得写的还好,讲出了每一个操作符的具体功能,还有图表和结果显示,有这些就足够了,我之前看其他人写的看完都不知道要做啥!
    nanchen2251:楼上正解,我和 Season_zlc 老师讲解的思路不一样。
    Euterpe:@ikiler 看看这个https://www.jianshu.com/p/8818b98c44e2
  • Mr_panmin:flatMap和concatMap不加.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread())时,打印不全,有时候根本就不打印,这是什么情况?
    郭之源:Thread.sleep(1000);
    在主线程睡一秒试试:mask:
  • e679508ef23f:楼主,这段示例代码到底是在哪个线程执行的?
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(new Consumer<MobileAddress>() {
    @Override
    public void accept(@NonNull MobileAddress s) throws Exception {
    Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
    }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<MobileAddress>() {
    sakurekid:被观察者在subscribeOn(Schedulers.io())在自带的线程里面运行的,观察者.observeOn(AndroidSchedulers.mainThread())在主线程里面运行的
    _He先森_:主线程吧,你打印下当前的线程名称,
  • 布吃饺子:我这里flatMap运行结果是顺序一致,多次实验结果相同
    柒月下旬:把随机时间变大一点 就比较明显了
    刻薄小北:我的也是
  • 真像大白:Zip 操作符: 请求两支接口 把两支接口的数据组合成一个 如果其中一只接口 error 了 能不能抛弃这个数据 只用第一支接口的数据返回呢 而不是就直接走 onError 了 或者应该用其他什么操作符么
    ffc226ee64b9:可以试试tryWhen
  • 30d510f59682:Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
    @Override
    public String apply(@NonNull String s, @nonnull Integer integer) throws Exception {
    return s + integer;
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
    mRxOperatorsText.append("zip : accept : " + s + "\n");
    Log.e(TAG, "zip : accept : " + s + "\n");
    }
    });

    subscribe(new Consumer<String>()没有指定运行在AndroidSchedulers.mainThread(),但它运行在AndroidSchedulers.mainThread()线程中?
    ffc226ee64b9:如果没有指定线程,那么发送事件默认运行再事件产生的线程,接收事件运行在发送事件所在的线程,你上面的代码是在主线程创建的所以运行在主线程
  • 黑白咖:签到
  • 七七就是七七:楼主我这里有几个关于flatMap的问题,
    1、代码里面floatMap加了delay运行,不会打印出任何东西;
    2、不加delay也不加线程异步的时候flatMap打印出来是有序的;
    3、不加delay加了线程异步,打印出来数据不全
    查了很久也没查到
    到处乱逛:貌似是有这个问题,最后发现是什么原因了么
    Mr_panmin:一样的问题
  • 孤独蓝天:最好文章底部也有系列的下一篇,不然每次得回到开头麻烦,谢谢
    孤独蓝天: @南尘2251对了,哥们,你的接口是天狗网的~点击那个文字去不了,百度的天狗网有~
    nanchen2251: @孤独蓝天 好的。
  • petma: Observable.concat(getStringObservable(), getIntegerObservable() ).subscribe(new Consumer<Object>() {
    @Override
    public void accept(@NonNull Object s) throws Exception {
    mRxOperatorsText.append("zip : accept : " + s + "\n");
    Log.e(TAG, "zip : accept : " + s + "\n");
    }
    });

    为什么只发送第一个呢?
    z白依:是不是getStringObservable() 没有 onComplete() ?
  • 潇风寒月:只有我发现了亮点么: 所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老的单身狗。:dog: :dog:
  • 46cf8f8ae1c2:写得很好,支持一下,很到位实用
  • RamboMing:Zip操作符:1.getStringObservable和getIntegerObservable不指定线程,会等getStringObservable执行完,才能执行zip和getIntegerObservable?
    2.getStringObservable和getIntegerObservable指定线程子线程,zip收不到信息?请问为什么
    ffc226ee64b9:@这是谁辣么帅 不是B发射完接收,是B发射一个接收一个,直到接收完A和B中最少事件个数的时候订阅者停止接收,但是被订阅者不会停止发送
    f88641dfd7c1:应该是你没订阅。
    这是谁辣么帅:1. 两个发射器在同一线程执行,发射当然有顺序,A发射完B发射,B发射完接收
    2. 按理讲应该是能接收到信息,如果接收不到可能是发射器出错了

本文标题:这可能是最好的RxJava 2.x 入门教程(二)

本文链接:https://www.haomeiwen.com/subject/cpbfcxtx.html