美文网首页android开发专题
RxJava操作符使用指南(二)

RxJava操作符使用指南(二)

作者: 小白兔兽型大发 | 来源:发表于2018-11-10 21:42 被阅读30次
    妹子镇楼

    过滤

    过滤类型的操作符就比较简单啦,主要就是将数据进行筛选,比如我们最常用的:

    filter

    通常用这个操作符去过滤掉不需要的数据,保证下游只接收我们想要的,比如,我现在只想要2号店的所有销售人员的数据:

       Flowable.fromIterable(cityStores)
                    .concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
    
                        int delay = 0;
                        if (1 == cityStore.getCityCtoreId()) {
                            delay = 500;
                        }
                        return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
                    })
                    .filter(salesman -> salesman.getCityStoreId() == 2)
                    .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                            + salesman.getSalesManId() + "号的业绩为:"
                            + salesman.getSalesPerformance() + "元"));
    
            //    2店的0号的业绩为:60元
            //    2店的1号的业绩为:27元
            //    2店的2号的业绩为:33元
    

    完全不用写for循环判断了!!

    debounce

    翻译过来就是防抖动的意思,其含义为,当我们发送了一次数据,在规定的时间内,又发送了新的数据,那么这一次发送的数据会被丢掉,先来看个例子

    Flowable.create(new FlowableOnSubscribe<String>() {
                @Override
                public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                    emitter.onNext("张三");
                    Thread.sleep(299);
                    emitter.onNext("李四");
                    Thread.sleep(300);
                    emitter.onNext("王五");
                    Thread.sleep(350);
                    emitter.onNext("赵六");
                    Thread.sleep(250);
                    emitter.onNext("费七");
                    Thread.sleep(100);
                    emitter.onNext("陈八");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.BUFFER)
                    .debounce(300, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String str) throws Exception {
                            Log.d(TAG, str);
                        }
                    });
            //         李四
            //         王五
            //         陈八
    
    

    这边我们规定的时间为300毫秒,

    • 当我们发送了张三的时候,经过了299毫秒,小于300,又发送了李四,张三丢掉
    • 当发送了李四,经过300毫秒,又发送了王五 满足条件,李四保留,王五同理,赵六,费七同样不满足条件,舍弃,陈八最后一个发送 后面没数据了,肯定也满足条件保留,所以打印出来的结果就是:李四,王五,陈八
    • 另外:如果用flowable配合create操作符,是要指定背压模式的.

    distinct

    去重,单纯的调用distinct(),就是去掉重复发射的元素

     List<Integer> distinctList = Arrays.asList(1, 1, 3, 3, 5);
            Flowable.fromIterable(distinctList)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG,integer+"");
                }
            });
              //   1
              //   3
              //   5
    

    还有一个重载的方法可以根据变量去重

            List<DistinctBean> distinctBeans = new ArrayList<>();
            distinctBeans.add(new DistinctBean("aaaa"));
            distinctBeans.add(new DistinctBean("bbbb"));
            distinctBeans.add(new DistinctBean("aaaa"));
            distinctBeans.add(new DistinctBean("cccc"));
            distinctBeans.add(new DistinctBean("bbbb"));
            Flowable.fromIterable(distinctBeans)
                    .distinct(new Function<DistinctBean, String>() {
                        @Override
                        public String apply(DistinctBean distinctBean) throws Exception {
                            return distinctBean.str;
                        }
                    })
                    .subscribe(distinctBean -> Log.d(TAG, distinctBean.toString()));
    //        DistinctBean{str='aaaa'}
    //        DistinctBean{str='bbbb'}
    //        DistinctBean{str='cccc'}
    

    也就是我们可以在apply方法中去指定一个key,来判断是否是重复的数据

    ElementAt

    获取指定位置的发射数据,索引是从0开始

     List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
            Flowable.fromIterable(mList)
                    .elementAt(3)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, integer + "");
                        }
                    });
            //   4
    
    • IgnoreElements 如果用这个的话,就不发送任何数据了

    first

    只发射第一个数据

       List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
            Flowable.fromIterable(mList)
                    .first(1)
                    .subscribe(integer -> Log.d(TAG, integer + ""));
            //   1
    

    如果用first的话,有个参数,这个参数是默认的item索引,也就是说原数据一直没发送,那么会发送默认索引上的值,不要参数的话可以换成firstElement

    last

    只发射最后一个数据

    Flowable.fromIterable(mList)
                    .last(1)
                    .subscribe(integer -> Log.d(TAG, integer + ""));
            //  5
    

    跟first完全一致,不要参数的话换成 lastElement

    sample

    sample就是抽样的意思,我们可以指定在某一个时间,对发射的数据进行采集,采集数据的标准时之前发射阶段的最有一个数据,比如:

      Flowable.interval(1000, TimeUnit.MILLISECONDS)
                    .sample(3000, TimeUnit.MILLISECONDS)
                    .subscribe(aLong -> Log.d(TAG, aLong + ""));
            //        1
            //        4
            //        7
            //        10 
            //        ..
    

    我们开一个定时任务,1秒钟发送一个数字(从0开始),3秒之后采集到最后一个发射数据1,再过3秒采集到了4,依次类推..

    skip/skipLast

    • skip以一个值为标准,之前的抑制,之后的发射
    Flowable.fromIterable(mList).skip(2)
          .subscribe(integer -> Log.d(TAG,integer.toString()));
            //   3
            //   4
            //   5
    

    另外,我们要说一下skip的重载方法

    1. Javadoc: skipLast(long,TimeUnit)
    2. Javadoc: skipLast(long,TimeUnit,Scheduler)

    举个例子

     Flowable.interval(1000,TimeUnit.MILLISECONDS)
                    .skip(3000,TimeUnit.MILLISECONDS)
                    .subscribe(aLong -> Log.d(TAG, aLong.toString()+",当前线程:"
                            +Thread.currentThread().getName()));
            //   3,当前线程:RxComputationThreadPool-2
            //   4,当前线程:RxComputationThreadPool-2
            //   5,当前线程:RxComputationThreadPool-2
            //   ...
    

    这里我们用到了两个参数的重载方法,我们很明显可以看到这个方法skip的值为时间值,示例为跳过前3秒的发送数据,而且输出环境在子线程里,如果想指定线程的话,可以用第三个参数进行指定.

    • skipLast与skip正相反,以一个值为标准,抑制后面的发射数据,而其重载方法与skip是一致的.
     Flowable.fromIterable(mList).skipLast(2)
          .subscribe(integer -> Log.d(TAG,integer.toString()));
            //   1
            //   2
            //   3
    

    take/takeLast

    一个是只发送前面n项,一个正相反,发送后面n项

      Flowable.fromIterable(mList).take(2)
                    .subscribe(integer -> Log.d(TAG,integer.toString()));
            //   1
            //   2
            Flowable.fromIterable(mList).takeLast(2)
                    .subscribe(integer -> Log.d(TAG,integer.toString()));
            //   4
            //   5
    

    take/takeLast也是有重载方法的,其默认线程也在子线程中(computation),跟skip没区别,我们就不在举例了

    组合

    zip

    官方翻译:zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据,什么意思?先举一个简单的小例子:

     Flowable.zip(Flowable.just("1"), Flowable.just("A", "B", "C"),
                    (s1, s2) -> s1 + "-----" + s2).subscribe(s -> Log.d(TAG, s));
      //1----A
    
    • 就是将发射的多个Flowable/Observable的数据按顺序组合在一起,并且因为第一个Flowable里面只有一个数据,所以第二个Flowable的"B","C"就都不发射了.

    zip是rxJava2.0+才有的,它可以解决了复杂页面多接口调用的问题,也就是说一个页面好多接口,如果都响应完了才展示页面的话,就不用同步去请求,我们可以用zip同时发,其原理是把多个Observable组合成新的Observable,比如首页获取个人信息,获取banner,获取活动列表三个接口,就可以:

    Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
    Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
    Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
            
            Flowable.zip(bannerFlowable,
                    activityFlowable,
                    memberFlowable,
                    new Function3<BaseDataResponse<List<Banner>>,
                            BaseDataResponse<List<Activity>>,
                            BaseDataResponse<Member>, Map<String, Object>>() {
                        @Override
                        public Map<String, Object> apply(BaseDataResponse<List<Banner>> bannerResponse,
                                                         BaseDataResponse<List<Activity>> activityResponse,
                                                         BaseDataResponse<Member> memberResponse) throws Exception {
                            HashMap<String, Object> map = new HashMap<>(3);
                            map.put("HOME_BANNER", bannerResponse);
                            map.put("HOME_ACTIVITY", activityResponse);
                            map.put("HOME_MEMBER", memberResponse);
                            return map;
                        }
                    }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Map<String, Object>>() {
                        //1.正常这边应该有一个自定义的Observer,可以用subscribeWith 自定义一个Observer,处理数据异
                        //2.由于每个公司的数据形式也不统一,我们举例子就不写了
                        @Override
                        public void accept(Map<String, Object> dataMap) throws Exception {
                            BaseDataResponse < List < Banner >> BannerResponse = cast(dataMap.get("HOME_BANNER"));
                            if(BannerResponse!=null){
                                List<Banner> bannerList = BannerResponse.getData();
    //                            mView.showBanner(bannerList);
                            }
                     //       ..
                        }
                    });
    

    首先定义三个接口flowable,然后分别作为zip方法的前三个参数,而function方法中,通常会定义一个基本的数据类型封装类,其泛型为三个接口返回的数据类型,第四个是做为apply方法的返回值的,我们可以定义Map,List,Object,都没有问题,其主要是封装返回值,这里我们用的是map,
    接下来线程转换,在accept中取出每一个response 转换成我们要的数据,展示,也可以用lamada简化一下:

     Flowable.zip(bannerFlowable, activityFlowable, memberFlowable,
                    (bannerResponse, activityResponse, memberResponse) -> {
                        HashMap<String, Object> map = new HashMap<>(3);
                        map.put("HOME_BANNER", bannerResponse);
                        map.put("HOME_ACTIVITY", activityResponse);
                        map.put("HOME_MEMBER", memberResponse);
                        return map;
                    }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(dataMap -> {
                        BaseDataResponse<List<Banner>> BannerResponse = cast(dataMap.get("HOME_BANNER"));
                        if (BannerResponse != null) {
                            List<Banner> bannerList = BannerResponse.getData();
                            //                            mView.showBanner(bannerList);
                        }
                        //        ...
                    });
    

    merge

    merge就是合并的意思,上文用zip实现的例子同样也可以用merge实现,比如:

     Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
            Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
            Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
            Flowable.merge(bannerFlowable, activityFlowable, memberFlowable)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<BaseDataResponse<? extends Object>>() {
                        @Override
                        public void accept(BaseDataResponse<?> baseDataResponse) throws Exception {
                            //baseDataResponse->banner
                            //baseDataResponse->activity
                            //baseDataResponse->member
                        }
                    });
    

    不同的是,zip是打包封装(我们封装成了map一并返回),而merge中的accept方法要走三次,需要我们进行分别判断,我们可以看一下merge的源码:

    @SuppressWarnings({ "unchecked", "rawtypes" })
        @CheckReturnValue
        @BackpressureSupport(BackpressureKind.FULL)
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Flowable<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
            ObjectHelper.requireNonNull(source1, "source1 is null");
            ObjectHelper.requireNonNull(source2, "source2 is null");
            ObjectHelper.requireNonNull(source3, "source3 is null");
            return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), false, 3);
        }
    

    很明显 返回值的是通过fromArray轮流发送在进行flatMap进行变换,再举个例子就很清楚了

            Flowable.merge(Flowable.just("1"), Flowable.just("A", "B", "C"))
                    .subscribe(s -> {
                        Log.d(TAG, s);
                        //1
                        //A
                        //B
                        //C
                    });
    

    这里有两个问题需要说一下
    1.merge是轮流发的,如果有10个要发送,那么第一个抛异常,第二个就不走了,如果想让下面的继续发,需要将merge换成MergeDelayError
    2.merge一但我们在发送过程中延迟发送那么发送的数据会显示在最后面,如果想保证顺序,需要将merge换成concat

    combineLatest

    这个操作符基本用于校验,zip是组合每一个发射的Flowable,combineLatest是只要其中一个发射的时候,他会找其他已经发射过的Flowable的最后一个数据,进行组合,比如:

            Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5);
            Flowable<String> flowable2 = Flowable.just("A", "B", "C");
            Flowable<String> flowable3 = Flowable.just("100", "200");
            Flowable.combineLatest(flowable1, flowable2, flowable3, new Function3<Integer, String, String, String>() {
                @Override
                public String apply(Integer integer, String s, String s2) throws Exception {
                    return integer + ":" + s + ":" + s2;
                }
            }).subscribe(string -> Log.d(TAG, string));
    //        5,C,100
    //        5:C:200
    

    flowable1, flowable2都已经发射过了,到flowable3,当它发射100的时候,5,"c",分别对应了最近发射的数据,所以按apply中的方式进行组合在一起输出,那实际运用当中,我可以用它进行多项条件的判定,都满足,再进行下一步

    join

    官方解读:任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据,等于加了个时间的范畴,也就是两个Observable,在发射的数据都是有有效期的,同在有效期之内,就组合,比如:

     Flowable<Long> baseFlowable = Flowable.interval(1000, TimeUnit.MILLISECONDS);
            Flowable<String> flowable = Flowable.just("A", "B", "C", "D");
            baseFlowable.join(flowable, new Function<Long, Publisher<Long>>() {
                @Override
                public Publisher<Long> apply(Long aLong) throws Exception {
                    Log.d(TAG, "===left:" + aLong);
                    return Flowable.timer(2000, TimeUnit.MILLISECONDS);
                }
            }, new Function<String, Publisher<Long>>() {
                @Override
                public Publisher<Long> apply(String s) throws Exception {
                    Log.d(TAG, "===right:" + s);
                    return Flowable.timer(5000, TimeUnit.MILLISECONDS);
                }
            }, new BiFunction<Long, String, String>() {
                @Override
                public String apply(Long aLong, String s) throws Exception {
                    return aLong + "----" + s;
                }
            }).subscribe(string -> Log.d(TAG, string));
    
            //        ===right:A
            //        ===right:B
            //        ===right:C
            //        ===right:D
            //        ==left:0
            //        0----A
            //        0----B
            //        0----C
            //        0----D
            //        ===left:1
            //        1----A
            //        1----B
            //        1----C
            //        1----D
            //        ...
            //        4----A
            //        4----B
            //        4----C
            //        4----D
            //        ===left:5
            //        ===left:6
            //        ===left:7
            //      ....
    

    我们通过观察不难发现

    • 首先将第二个flowable发射的数据依次设置了5秒的有效期
    • 然后从base开始依次发射0,1,2..并给予其2秒有效期(这个写多少都可以)
    • 所以都在5秒有效期之内,分别跟A,B,C,D进行组合,输出结果

    startWith

    在数据序列的开头插入一条指定的项

    List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
            Flowable.fromIterable(mList).startWith(100).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer.toString());
                }
            });
            //      100
            //      1
            //      2
            //      ...
    

    当然,还可以这样:

       Flowable.fromIterable(mList).startWith(Arrays.asList(100,200))
                    .subscribe(integer -> Log.d(TAG, integer.toString()));
            //      100
            //      200
            //      1
            //      2
            //      ...
    
            Flowable.fromIterable(mList).startWith(Flowable.just(1000,2000))
                    .subscribe(integer -> Log.d(TAG, integer.toString()));
            //      1000
            //      2000
            //      1
            //      2
            //      ...
    

    switchOnNext

    订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃.
    在这边我们借鉴了一下其他人写的例子:

    Flowable<Flowable<Long>> flowable = Flowable.interval(500, TimeUnit.MILLISECONDS)
                    .map(new Function<Long, Flowable<Long>>() {
                        @Override
                        public Flowable<Long> apply(Long aLong) throws Exception {
                            Log.d(TAG, "====fu: "+aLong );
                            return Flowable.interval(0,200,TimeUnit.MILLISECONDS)
                                    .map(new Function<Long, Long>() {
                                        @Override
                                        public Long apply(Long aLong) throws Exception {
                                            Log.d(TAG, "===zi: "+aLong );
                                            return aLong * 10;
                                        }
                                    }).take(5);
                        }
                    }).take(2);
    
            Flowable.switchOnNext(flowable)
                    .subscribe(aLong -> Log.d(TAG, "onNext: SwitchOnNext  "+aLong));
    
    //        ====fu: 0
    //        ===zi: 0
    //        onNext: SwitchOnNext  0
    //        ===zi: 1
    //        onNext: SwitchOnNext  10
    //        ===zi: 2
    //        onNext: SwitchOnNext  20
    //        ====fu: 1
    //        ===zi: 0
    //        onNext: SwitchOnNext  0
    //        ===zi: 1
    //        onNext: SwitchOnNext  10
    //        ===zi: 2
    //        onNext: SwitchOnNext  20
    //        ===zi: 3
    //        onNext: SwitchOnNext  30
    //        ===zi: 4
    //        onNext: SwitchOnNext  40
    

    这个例子写的非常好,等于是一个Flowable循环嵌套,最外层只发送两次,当第二次发送数据,内层订阅之后,第一次还没来得及发的数据就被舍弃掉了.

    到这边Rxjava的操作符基本都写完了,还有一些辅助,订阅,切换线程之类的比较简单,而且我们在示例中都有使用,就不再单独拎出来一一说明了.能看到这的绝对是真爱,爱你!!!
    用的测试代码,请戳这里

    相关文章

      网友评论

        本文标题:RxJava操作符使用指南(二)

        本文链接:https://www.haomeiwen.com/subject/buhtfqtx.html