美文网首页RxJava
RxJava操作符使用指南

RxJava操作符使用指南

作者: 小白兔兽型大发 | 来源:发表于2018-11-10 21:42 被阅读152次

      先来张女神的剧照放松一下...


    好的好的

      话说gakki又拍新剧了,继爱干家务且投怀送抱不送不行之后,又完美诠释了"假面"超人,微笑pasta,其实如果没人给予阳光,那就做个快乐的小太阳吧.
      好了,我们进入正题..


    (*^▽^*)
      最近一直在重构老项目的代码,每天一顿删删删,开心到飞起...相信RxJava到现在没用过的人估计也很少了,我在重构的过程当中,把之前一些复杂的逻辑基本上都用操作符进行了处理,在这边我们就来总结一下,好像有点炒冷饭的意思...那RxJava(2.0+)的操作符大致有这么几类:创建,变换,过滤,组合等等,我们就一类一类说...

    创建

    首先,我们要说一下被观察者,也就是上游的创建方式,那我们发射一个带这样集合的火箭:
    private List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);

    create

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (Integer number : mList) {
                        emitter.onNext(number);
                    }
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) 
                ...
                @Override
                public void onNext(Integer number)
                  ...
                @Override
                public void onError(Throwable e)
                 ...
                @Override
                public void onComplete()
                ...
    
            });
    

    最原始的版本就是用create来创建,但是遍历集合一个一个发,太臃肿了,那我们可以换一种方式,并且有两处地方我们改动一下:
    1:我现在只想要onNext()返回,这就要看看是否有Observer的实现类
    2:同时Observable是不支持Backpressure的,如果我们发1万条,只会增大内存,不会抛MissingBackpressureException,我们可以用2.0+版本中的Flowable

    rang

       Flowable.range(1,6).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Consumer,"+integer);
                }
            });
    //                Consumer,1
    //                Consumer,2
    //                Consumer,3
    //                Consumer,4
    //                Consumer,5
    

    range操作符是从1开始(包括1)依次发射后面6个数,很明显可以达到我们的要求,然后我们来看看接收的consumer,点进去发现它是一个接口:

    package io.reactivex.functions;
    
    /**
     * A functional interface (callback) that accepts a single value.
     * @param <T> the value type
     */
    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    

    那这个consumer是2.0的,其对应的也是1.0+版本中的Action,这时候又有个疑问??那有异常我怎么处理?真正在开发之中是要自定义Observer的去统一处理数据异常的,我们写demo就不用那么麻烦,如果用consumer的话,想处理异常还可以:

      Flowable.range(1, 5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Consumer," + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e(TAG, throwable.toString());
                }
            });
    

    我们可以看到同一个接口用泛型来区分实现类,那之后我们为了代码更简洁,就用new consumer的方式来接收了.

    fromIterable

    除了rang之外,fromIterable也是可以的,而且这个操作符更贴切,因为其原理就是遍历集合,一个一个发射:

     Flowable.fromIterable(mList)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "Consumer," + integer);
                        }
                    });
    

    fromArray

    同理:数组

       private Integer[] mArr = {1,2,3};
        ...
       Flowable.fromArray(mArr)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "Consumer," + integer);
                        }
                    });
    

    just

    这里要不得不说一下just,这个操作符比较强大,它可以一次发送一个集合,数组,也可以一个一个的发送不同类型的数据等等,我们来举个例子:
    (为了让它一行就可以,我这边把consumer这个匿名内部类替换成lamada表达式,更直观)

            //发射集合
            Flowable.just(mList).subscribe(numberList -> Log.d(TAG, numberList.toString()));
            //        [1, 2, 3, 4, 5]
            //发射数组
            Flowable.just(mArr).subscribe(numberArr -> Log.d(TAG, Arrays.toString(numberArr)));
            //        [1, 2, 3]
            //发射字符串
            Flowable.just(1, "A", 10).subscribe(str -> Log.d(TAG, str));
            //        1
            //        A
            //        10
    
    

    defer

    这个操作符很有意思,我们之前说的just,from等等,都是在创建上游被观察者的时候确定了数据,也就是说发射的什么数据我是知道的,那如果发射的数据本身就会变呢?比如这样:

            StringBuilder deferMsg = new StringBuilder();
            deferMsg.append("张三发来贺电");
            Flowable<String> justDefer = Flowable.just(deferMsg.toString());
            deferMsg.append(",李四发来贺电");
            justDefer.subscribe(s -> Log.d(TAG, s));
          //张三发来贺电
    

    如果用defer的话

            StringBuilder deferMsg = new StringBuilder();
            deferMsg.append("张三发来贺电");
            Flowable<String> defer = Flowable.defer((Callable<Publisher<String>>) () ->
                    Flowable.just(deferMsg.toString()));
            deferMsg.append(",李四发来贺电");
            defer.subscribe(s -> Log.d(TAG, s));
            //张三发来贺电,李四发来贺电
    

    我们可以看到是订阅的时候才会走Flowable的回调方法并且会创建一个新的Flowable/Observable,而不是创建的时候走,有点类似于eventBus的sticky,有人说这个叫延迟订阅,所以defer用于处理动态的数据,保证上游的数据是最新的

    repeat

    repeat就是我们可以在发射的时候指定重复次数:

       Flowable.just(mList).repeat(2)
                           .subscribe(numbers -> Log.d(TAG, numbers.toString()));
      //[1, 2, 3, 4, 5] [1, 2, 3, 4, 5]
    

    timer

      Flowable.timer(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
                                    + Thread.currentThread().getName());
                        }
                    });
    //Timer:0=====当前线程为:RxComputationThreadPool-1
    

    我们可以用Timer操作符去延迟执行一个任务,比如上面延迟1秒,上游会向下游发送一个数据0,这边有两点需要注意
    1.timer的是有第三个参数的,第三个参数可以指定运行线程
    2.如果不指定,默认线程为computation,如果想更改ui,记得要做线程转换

    Interval

    上面说的timer是延迟发送,而interval是以一个固定的时间间隔不断发发发,比如一个很常见的需求,我需要每隔5分钟获取服务器信息,就可以:

       Flowable.interval(5, TimeUnit.MINUTES).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
                            + Thread.currentThread().getName());
                }
            });
            // Timer:0=====当前线程为:RxComputationThreadPool-1
            // Timer:1=====当前线程为:RxComputationThreadPool-1
            // Timer:2=====当前线程为:RxComputationThreadPool-1
            // Timer:3=====当前线程为:RxComputationThreadPool-1
            // Timer:4=====当前线程为:RxComputationThreadPool-1 
        
    

    我们发现也是默认在子线程,当然,我们也可以写一个发送短信的倒计时:

            Flowable.interval(0, 1, TimeUnit.SECONDS)
                    .take(60)
                    .map(aLong -> 60 - aLong)
                    .doOnSubscribe(subscription -> tv.setClickable(false))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(aLong -> {
                    tv.setText(aLong + "s"); });
    

    简单说一下

    1. interval第一个参数是延迟发送,我们这里用的0秒,也就是不延迟
    2. take:由于interval从0开始无限发,take就是制定发多少次,所以在这边我们让它只发60次.
    3. 由于数据是从0,1,2,3开始发,所以这里我们用map做了一个转换把数据变成了60,59,58...
    4. doOnSubscribe这个方法用于订阅之前执行,它跟doOnNext不太一样,doOnNext跟随onNext,onNext执行多少次,它就执行多少次,而doOnSubscribe只执行一次,所以,一般做初始化操作,比如我点击了按钮发送验证码,在倒计时的时候,你就不能再点击了,所以禁用点击功能.
    5. 正常情况下应该在onComplete中恢复按钮点击功能,并且修改ui为点击/获取.
    6. 以后就在也不用这写一块handler,那边写一块timer/timerTask了..

    变换

    先说用的最多的:

    map

    我们在工作中经常遇到数据处理的问题,可不是你想要什么数据形式,后台小哥哥就给的到你的,于是在我们拿到数据后,可以用map进行转换,比如:
    1.转换成另外一个集合

            Flowable.just(mArr).map(new Function<List<Integer>, List<String>>() {
                @Override
                public List<String> apply(List<Integer> mList) throws Exception {
                    List<String> newList = new ArrayList<>();
                    for (Integer number : mList) {
                        newList.add(number + "string");
                    }
                    return newList;
                }
            }).subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> newList) throws Exception {
                    Log.d(TAG, newList.toString());
                }
            });
    // [1string, 2string, 3string, 4string, 5string]
    

    2.转换成一个对象

       //  将集合最后一个数字取出并放到对象里
      Flowable.just(mArr)
              .map((Function<List<Integer>, NumberTest>) mList -> {
                NumberTest numberTest = new NumberTest();
                numberTest.setNumber(mList.get(mList.size() - 1));
                return numberTest;
            }).subscribe((Consumer<NumberTest>) numberTest -> Log.d(TAG, numberTest.toString()));
        //  NumberTest{number=5}
    

      通过map操作符,我们可以在apply这个小方法里面,各种定义数据转换类型,你可以把一个对象换成另外一个对象,或者把一个对象放进一个数组,再或者把数组转换成一个集合,都在整个链式调用中间完成,代码逻辑简单易懂,最重要的是非常美观!!

    flatMap

    这个操作符跟Map相比有一点不同,我们先用flatmap写一下上面的例子:

       Flowable.just(mArr).flatMap(new Function<List<Integer>, Publisher<List<String>>>() {
                @Override
                public Publisher<List<String>> apply(List<Integer> mList) throws Exception {
                    List<String> newList = new ArrayList<>();
                    for (Integer number : mList) {
                        newList.add(number + "string");
                    }
                    return Flowable.just(newList);
                }
            }).subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> newList) throws Exception {
                    Log.d(TAG, newList.toString());
                }
            });
    //[1string, 2string, 3string, 4string, 5string]
    

    跟map相比,我们可以非常明显的看到在数据转换的类型上是不一样的.
    1.map操作符:我们只需要将转换的类型作为apply方法中的返回值即可.
    2.flatMap操作符:其原理是将上游发送的数据打包,形成一个全新的Flowable/Observable,所以我们在做数据转换的时候,需要将转换完成的数据类型重新封装成Flowable.
      WTF,麻烦不是一点点啊!


    别着急.jpg

    通过上面的例子,不难发现我们全程只发送了一个集合,当然转换跟接收也就只有这一次,如果是这种情况的话,像数据转换这种小问题交给map处理就可以了,而flatMap通常可以解决两种问题:
    1.for循环嵌套,比如我要获取一个城市所有的门店中的每一名销售人员的业绩,显然有俩集合,一个门店的集合,一个销售人员的集合,那用flatMap我们就可以这样子:

     Flowable.fromIterable(cityStores)
             .flatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
                   return Flowable.fromIterable(cityStore.getSalesman()); })
             .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() 
                            + "店的"
                            + salesman.getSalesManId() + "号的业绩为:"
                            + salesman.getSalesPerformance() + "元"));
    
    //    1店的0号的业绩为:58元
    //    1店的1号的业绩为:38元
    //    1店的2号的业绩为:29元
    //    2店的0号的业绩为:49元
    //    2店的1号的业绩为:51元
    //    2店的2号的业绩为:84元
    //    3店的0号的业绩为:65元
    //    3店的1号的业绩为:14元
    //    3店的2号的业绩为:44元
    

    2.网络请求嵌套,最常见的注册完以后自动登录,我们就完全可以这么写:

           ApiService.goToRegister()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext((Consumer<RegisterBean>) registerBean -> {
                        //注册完成
                    })
                    .observeOn(Schedulers.io())
                    .flatMap((Function<RegisterBean, Publisher<LoginBean>>) 
                    registerBean -> ApiService.goToLogin(account, pwd))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe((Consumer<LoginBean>) loginBean -> {
                        //登录成功
                    });
    

    或者比如首页,在不用zip的情况下,可以先获取轮播图,再获取列表数据,再获取其他信息,不同的接口只要一条链式调用就ok,当然在实际项目中封装一下线程转换,跟统一数据格式处理,那就更简单了,清爽到没朋友.

    concatMap

    concatMap跟flatMap相比,它是有序的,上文获取销售业绩的例子,比如我们给第一个店加一个500毫秒的延迟,那么顺序就变了:

        // 2店的0号的业绩为:7元
        ...
        // 3店的2号的业绩为:20元
        ...
        // 1店的2号的业绩为:80元
    

    我们只需要将flatMap替换成concatmap就可以保证顺序

          Flowable.fromIterable(cityStores)
                    .concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
    
                        int delay = 0;
                        if (1 == cityStore.getCityCtoreId()) {
                            delay = 500;
                        }
                        return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
                    })
                    .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                            + salesman.getSalesManId() + "号的业绩为:"
                            + salesman.getSalesPerformance() + "元"));
    //        1店的0号的业绩为:21元
    //        1店的1号的业绩为:2元
    //        1店的2号的业绩为:88元
    //        2店的0号的业绩为:93元
    //        2店的1号的业绩为:33元
    //        2店的2号的业绩为:44元
    //        3店的0号的业绩为:100元
    //        3店的1号的业绩为:43元
    //        3店的2号的业绩为:39元
    

    switchMap

    这个操作符我感觉用的比较少,不过既然也属于map范畴,我们就简单说一下,我们先将上面的concatMap换成switchMap,并且在变换的时候打个log,并且加上线程:

       Flowable.fromIterable(cityStores)
                    .switchMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
    
                        int delay = 0;
                        if (1 == cityStore.getCityCtoreId()) {
                            delay = 500;
                        }
                        Log.d(TAG,"=====switchMap:"+cityStore.getCityCtoreId());
                        return Flowable.fromIterable(cityStore.getSalesman()).delay(delay,TimeUnit.MILLISECONDS);
                    })
                    .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                            + salesman.getSalesManId() + "号的业绩为:"
                            + salesman.getSalesPerformance() + "元"));
    //        =====switchMap:1|main
    //        =====switchMap:2|main
    //        2店的0号的业绩为:57元|RxComputationThreadPool-2
    //        =====switchMap:3|main
    //        3店的0号的业绩为:30元|RxComputationThreadPool-3
    //        3店的1号的业绩为:86元|RxComputationThreadPool-3
    //        3店的2号的业绩为:41元|RxComputationThreadPool-3
    

    switchMap:不难发现它的原理是当上一个订阅流程未结束时,检测到有新的发射数据了,那么上一个订阅还未走的流程会被舍弃掉,拿上面的例子举例说明:
    原数据是一共发射三个对象,当1号店睡500毫秒的时候,二号店发射了,1号店舍弃,这时候注意,所有经过变换再发射的数据统统扔到了子线程里,当2号店0号员工出了业绩,3号店发射了,那么2号店的1,2号员工被舍弃,流程直到3号店所有人员走完

    groupBy

    没错,同SQ中的groupBy差不多,就是分组的意思,也就是说把发射出去的数据按需求,分成不同的组,比如:现在我想把上面的数据分成两个组,1号店单独放在一个集合里,2,3号店放在一个集合里,那就酱:
    (不好理解的就不用lamada了)

      Flowable.fromIterable(cityStores).groupBy(new Function<CityStore, Boolean>() {
                @Override
                public Boolean apply(CityStore cityStore) throws Exception {
                    return cityStore.getCityCtoreId()==1;
                }
            }).subscribe(new Consumer<GroupedFlowable<Boolean, CityStore>>() {
                @Override
                public void accept(GroupedFlowable<Boolean, CityStore> objectCityStoreGroupedFlowable) throws Exception {
                    Boolean key = objectCityStoreGroupedFlowable.getKey();
                    Log.d(TAG,key+"");
                    objectCityStoreGroupedFlowable.toList().subscribe(new Consumer<List<CityStore>>() {
                        @Override
                        public void accept(List<CityStore> cityStores) throws Exception {
                            Log.d(TAG,cityStores.toString());
                        }
                    });
                }
            });
    //        [CityStore {
    //            cityCtoreId = 1, salesman = [Salesman {
    //                cityStoreId = 1, salesPerformance = 35
    //            }, Salesman {
    //                cityStoreId = 1, salesPerformance = 31
    //            }, Salesman {
    //                cityStoreId = 1, salesPerformance = 10
    //            }]
    //        }]
            
    //        [CityStore {
    //            cityCtoreId = 2,
    //                    salesman = [Salesman {
    //                cityStoreId = 2, salesPerformance = 37
    //            }, Salesman {
    //                cityStoreId = 2, salesPerformance = 96
    //            }, Salesman {
    //                cityStoreId = 2, salesPerformance = 12
    //            }]
    //        }, CityStore {
    //            cityCtoreId = 3,
    //            salesman = [Salesman {
    //                cityStoreId = 3, salesPerformance = 19
    //            }, Salesman {
    //                cityStoreId = 3, salesPerformance = 94
    //            }, Salesman {
    //                cityStoreId = 3, salesPerformance = 12
    //            }]
    //        }]
    

    分析:

    1. 在apply这个回调方法中,我们需要定义自己的分组标准,上面定义了1号店返回true,其余返回false
    2. 我们定义的这个标准其实就是个key,那么groupBy这个操作符会根据这个key将原来的数据,也就是3个对象,拆分成2(true/false)个Flowable,然后在分别发射其中的数据,也就需要订阅两次.

    buffer

    官方的解释的非常到位:定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值,其本质是将一个Observable变换为另一个原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合.
    我们详细说一下buffer的两个方法:

    • buffer(count)
    Flowable.fromIterable(mList).buffer(3)
                            .subscribe(integers -> Log.d(TAG,integers.toString()));
                    //        [1, 2, 3]
                    //        [4, 5]
    

    这个方法类似于分组,等于是将之前的集合分成了若干集合发送,每一个集合会小于等于count,并且没有重叠部分,如果count正好等于或者大于集合的长度,就一次性都发射了,这个比较好理解

    • buffer(count, skip)
     Flowable.fromIterable(mList).buffer(3, 2)
                    .subscribe(integers -> Log.d(TAG, integers.toString()));
            //    [1, 2, 3]
            //    [3, 4, 5]
            //    [5]
    

    这个重载的方法就有一点不好理解了,我们分成几点说一下

    1. 首先看第一个参数count,count指定了我们每一次的缓存集合长度,第一次[1,2,3]没有任何问题
    2. skip这个值得意思是,每当我缓存的时候,跳过第几个数据,我们设置的是2,第一次缓存[1,2,3]的时候,我们可以想象有一个指针跳过了第二个数据(2)指向了-->3
    3. 那么当第二次缓存的时候,指针指着3,从3开始,继续缓存count(3)个数据[3,4,5],并且skip第二个数据(4),现在的指针指向了-->5.

    然后依次类推,这样会有重叠的部分,也可能有丢掉的部分,当然buffer这个操作符还有一些其他的功能,可以戳这里,我们就不详谈了.

    window

    window这个操作符是把数据分成了每个窗口,逐次发射,实现起来跟buffer类似,但是他不是一次性发完了所有数据,其原理是把原始数据分好组以后封装成一个新的Flowable/Observable,并且由新的逐个发送数据,比如:

     Flowable.fromIterable(mList).window(3)
                    .subscribe(new Consumer<Flowable<Integer>>() {
                        @Override
                        public void accept(Flowable<Integer> integerFlowable) throws Exception {
                            integerFlowable.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.d(TAG, integer.toString());
                                }
                            }, new Consumer<Throwable>() {
                                @Override
                                public void accept(Throwable throwable) throws Exception {
                                    Log.d(TAG, "onError");
    
                                }
                            }, new Action() {
    
                                @Override
                                public void run() throws Exception {
                                    Log.d(TAG,"onComplete");
                                }
                            });
                        }
                    });
    //        1
    //        2
    //        3
    //        onComplete
    //        4
    //        5
    //        onComplete
    

    scan

    scan这个操作符的意思是对每一个数据进行同一个函数操作,并且以生成的值作为计算标准,计算下一个数据,比如:

    Flowable.fromIterable(mList).scan(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG, "integer:" + integer);
                    Log.d(TAG, "integer2:" + integer2);
                    return integer + integer2;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer.toString());
                }
            });
    //        1
    //        integer:1
    //        integer2:2
    //        3
    //        integer:3
    //        integer2:3
    //        6
    //        integer:6
    //        integer2:4
    //        10
    //        integer:10
    //        integer2:5
    //        15
    

    由此可见 apply方法中的第一个integer对应了我们每一次计算出来的和,而第二个integer2就是我们每一次发送的原数据,累加发送,依次执行..

    变换就写完了,我认为变换还是有一点不好理解的..本想一篇写完,不过觉得确实是太长了,下一篇我们会写一下剩下的操作符!

    相关文章

      网友评论

        本文标题:RxJava操作符使用指南

        本文链接:https://www.haomeiwen.com/subject/yyhtfqtx.html