美文网首页Android-RxJavaAndroid-Rxjava&retrofit&daggerAndroid 入门进阶
这可能是最好的RxJava 2.x 入门教程(四)

这可能是最好的RxJava 2.x 入门教程(四)

作者: nanchen2251 | 来源:发表于2017-06-26 18:02 被阅读10749次

    这可能是最好的 RxJava 2.x 入门教程系列专栏
    文章链接:
    这可能是最好的 RxJava 2.x 入门教程(完结版)【重磅推出】
    这可能是最好的 RxJava 2.x 入门教程(一)
    这可能是最好的 RxJava 2.x 入门教程(二)
    这可能是最好的 RxJava 2.x 入门教程(三)
    这可能是最好的 RxJava 2.x 入门教程(四)
    这可能是最好的 RxJava 2.x 入门教程(五)
    GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
    为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

    前言

    最近很多小伙伴私信我,说自己很懊恼,对于 RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。

    正题

    Single

    顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()

    Single.just(new Random().nextInt())
                    .subscribe(new SingleObserver<Integer>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
    
                        }
    
                        @Override
                        public void onSuccess(@NonNull Integer integer) {
                            mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
                            Log.e(TAG, "single : onSuccess : "+integer+"\n" );
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
                            Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
                        }
                    });
    

    输出:


    distinct

    去重操作符,简单的作用就是去重。


    Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("distinct : " + integer + "\n");
                            Log.e(TAG, "distinct : " + integer + "\n");
                        }
                    });
    

    输出:



    很明显,发射器发送的事件,在接收的时候被去重了。

    debounce

    去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。


    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("debounce :" + integer + "\n");
                            Log.e(TAG,"debounce :" + integer + "\n");
                        }
                    });
    

    输出:



    代码很清晰,去除发送间隔时间小于 500 毫秒的发射事件,所以 1 和 3 被去掉了。

    defer

    简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> call() throws Exception {
                    return Observable.just(1, 2, 3);
                }
            });
    
    
            observable.subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    mRxOperatorsText.append("defer : " + integer + "\n");
                    Log.e(TAG, "defer : " + integer + "\n");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
                    Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
                }
    
                @Override
                public void onComplete() {
                    mRxOperatorsText.append("defer : onComplete\n");
                    Log.e(TAG, "defer : onComplete\n");
                }
            });
    

    输出:


    last

    last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

    Observable.just(1, 2, 3)
                    .last(4)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("last : " + integer + "\n");
                            Log.e(TAG, "last : " + integer + "\n");
                        }
                    });
    

    输出:


    merge

    merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

    Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("merge :" + integer + "\n");
                            Log.e(TAG, "accept: merge :" + integer + "\n" );
                        }
                    });
    

    输出:


    reduce

    reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

    Observable.just(1, 2, 3)
                   .reduce(new BiFunction<Integer, Integer, Integer>() {
                       @Override
                       public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                           return integer + integer2;
                       }
                   }).subscribe(new Consumer<Integer>() {
               @Override
               public void accept(@NonNull Integer integer) throws Exception {
                   mRxOperatorsText.append("reduce : " + integer + "\n");
                   Log.e(TAG, "accept: reduce : " + integer + "\n");
               }
           });
    

    输出:



    可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

    scan

    scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

    Observable.just(1, 2, 3)
                    .scan(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                            return integer + integer2;
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mRxOperatorsText.append("scan " + integer + "\n");
                    Log.e(TAG, "accept: scan " + integer + "\n");
                }
            });
    

    输出:



    看日志,没毛病。

    window

    按照实际划分窗口,将数据发送给不同的 Observable

    mRxOperatorsText.append("window\n");
           Log.e(TAG, "window\n");
           Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
                   .take(15) // 最多接收15个
                   .window(3, TimeUnit.SECONDS)
                   .subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
                   .subscribe(new Consumer<Observable<Long>>() {
                       @Override
                       public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                           mRxOperatorsText.append("Sub Divide begin...\n");
                           Log.e(TAG, "Sub Divide begin...\n");
                           longObservable.subscribeOn(Schedulers.io())
                                   .observeOn(AndroidSchedulers.mainThread())
                                   .subscribe(new Consumer<Long>() {
                                       @Override
                                       public void accept(@NonNull Long aLong) throws Exception {
                                           mRxOperatorsText.append("Next:" + aLong + "\n");
                                           Log.e(TAG, "Next:" + aLong + "\n");
                                       }
                                   });
                       }
                   });
    

    输出:


    写在最后

    至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

    做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~


    nanchen

    相关文章

      网友评论

      • 風的記憶:写的很不错,很适合入门看
      • 达则兼济天下:debounce补充: 两个相邻数据发射的时间间隔决定了前一个数据是否会被丢弃,然而demo代码中5是最后一个数据,所以后面设置的510ms并不影响它是否被丢弃,也仅仅起一个线程等待时间的作用吧。代码分析如下:
        emitter.onNext(1); // skip 先收到一个1
        Thread.sleep(400);
        emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
        Thread.sleep(505);
        emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
        Thread.sleep(100);
        emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
        Thread.sleep(605);
        emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
        Thread.sleep(510);
        emitter.onComplete();
        V_Ballack:感谢解释
        达则兼济天下:@ezfantasy 😄
        ezfantasy:这个解释看得懂,博主贴出来的反而让我理解答案应该是1,3,5
      • luo2016:写的挺好的,清晰易懂
      • 追风筝的boy:我好懊恼,我突然变成了一个爱哭鼻子的傻瓜:joy:
        追风筝的boy:@nanchen2251 就是看到你前言里面的第一句话,很多同学私信给你,说自己很懊恼。看到这个词,突然就想起了《夏洛特烦恼》里面这句台词。没什么特别的含义:sweat:
        nanchen2251:@追风筝的boy 什么鬼
      • 30035123f1bc:嗯 这篇有点急了
      • thinkerzhangyan:debounce v发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射。
      • leilifengxingmw:文章感觉文笔叙述有的地方不是很详细,但还是可以的:smile:
      • b09dbea7de6a:defer是延迟订阅的意思。在订阅的时候,执行ObservableOnScubsribe call方法里面的代码。
      • ImTudou:关于last的建议:
        通过Observable只发出最后一个项目(或符合某些条件的最后一个项目)

        重点在发出(emit)一词,而不是取出。
        nanchen2251:这个可以。
      • 天苍地老不留情:scan操作符的例子,为什么会把1给打印出来呢?
        Prom_Jual:因为 1和另外一个没有赋值的(默认0)相加,所以sum=1
      • 间歇性丶神经病患者:楼主超厉害
        nanchen2251:@间歇性丶神经病患者 0 0.
      • SZhua:不孬,讲的很好
        nanchen2251:@SZhua 谢谢认了
      • 黑白咖:签到
      • Alsomail:last好像里面传任何数值,都是返回最后一个item啊,而且里面只能写入一个int类型参数,那它的参数有什么用呢
      • 3a1451eb7fb4:defer到底是怎么回事,感觉没讲清楚啊,博主
        hubery_:感觉其实就懒加载,只有具体的使用时才会创建
        seraphzxz:defer 的意思是,直到有订阅,才会创建 Observable ,具有延时的效果。比如:int a = 1;

        Observable.just(a); 此时 just 中的 a 已经初始化了,此后在修改 int a = 2;在订阅时依然会发送 a=1。

        使用 defer 就可以将初始化延迟到订阅时,大概就是这个意思吧。
        yao_94:同样表示没看懂
      • Davisxy:window这个没懂
        喝绿茶的考拉:@相公无爱 我理解window的功能和buffer类似 不过buffer 把数据源分割成list 而window把数据源分割成了observable
      • icoo:distinct 在第三篇里写过了,要去重啊:stuck_out_tongue_closed_eyes:
      • _大洲:从来只有听过RxJava的我竟然全看懂了:joy:
      • a953b6bd860e:整体不错,就是有些操作符没解释清楚
      • 飞奔的小马:debounce操作符 发射的最后一项即使把sleep时间改为小于500ms的值,还是会输出的,这个是怎么解释呢
        达则兼济天下:@zilch1974 你解释的很到位,两个相邻数据发射的时间间隔决定了前一个数据是否会被丢弃,然而demo代码中5是最后一个数据,所以后面设置的510ms并不影响它是否被丢弃,也仅仅起一个线程等待时间的作用吧。代码具体分析如下:
        emitter.onNext(1); // skip 先收到一个1
        Thread.sleep(400);
        emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
        Thread.sleep(505);
        emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
        Thread.sleep(100);
        emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
        Thread.sleep(605);
        emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
        Thread.sleep(510);
        emitter.onComplete();
        e145da3c954d:我一开始也有这个疑惑,后来看了源码的解释后终于理解了。
        比如我按照以下顺序发送数据:1--->2--->3--->4--->5--->6,间隔时间依次为200,350,250,280,290ms,而我的debounce设定的时间为285,那么mirro的observable首先收到了一个1,200ms后收到了2,因为200小于285,所以前面的1会被丢弃掉,现在只有一个2,然后再过350ms后来了3,因为350大于设定时间,所以3会被存储,之后的4是250ms之后来的,所以它会替换掉4...
        就是这么一个流程。
      • 一步一年华:23的为啥会输出scan 1??
      • Xdjm:distinct在第三章讲过啦

      本文标题:这可能是最好的RxJava 2.x 入门教程(四)

      本文链接:https://www.haomeiwen.com/subject/aspjcxtx.html