美文网首页
Rxjava2.0笔记-004-合并,过滤操作符实际应用

Rxjava2.0笔记-004-合并,过滤操作符实际应用

作者: ccccccal | 来源:发表于2018-01-11 12:06 被阅读118次

    关于合并数据源:之前使用了flatMap()以及concatMap()进行嵌套调用,,注册之后登陆

    合并数据源2:合并数据(获取图书详情以及评论)统一展示到客户端:采用merge()或者zip()操作符

    merge()例子:实现较为简单的从(网络+本地)获取数据,,统一展示

    zip()例子:结合Retrofit以及Rxjava,实现多个网络请求合并获得数据,,统一展示

    二者区别为:merge()只添加被观察者合并数据源的操作在observable观察者的onnext()里面处理,进行合并,合并的结果在onComplete()处理,zip()可以直接添加发射者,再添加合并数据源的bean,在转主线程,订阅,可以使用new Consumer<Bean>() )里面处理合并结果

    /**
         * 合并发射者,按时间线执行
         * 合并事件,还是merge()比较方便好用
         */
    
        String resultss = "数据源来自:";
    
        private void merge() {
    
    //        Observable.merge(
    //                //延迟发送操作符
    //                //从0开始发送,发送3个数据,第一次发件延迟时间1秒。间隔时间1s
    //                //
    //                Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
    //                Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
    //        ).subscribe(aLong -> {
    //
    //        });
    
            Observable.merge(
                    Observable.just("网络"),
                    Observable.just("本地文件")
            ).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
                @Override
                public void onNext(String s) {
                    resultss += s;
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onComplete() {
    
                    KLog.d(TTAG, "接收完成统一处理事件:" + resultss);
                }
            });
        }
    

    下面使用zip操作:

      /**
         * 合并数据源
         */
        private void zip() {
    
            Observable.zip(
                    retrofitApi.getCall().subscribeOn(Schedulers.io()),
                    retrofitApi.getCall().subscribeOn(Schedulers.io()),
                    (translation, translation2) ->
                            translation.toString() + translation2.toString())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(s -> {
    
                        KLog.d(TTAG, "合并的数据源是:" + s.toString());
                    }, throwable -> {
    
                    });
        }
    

    concat()实例

     /**
         * 该类型的操作符的作用 = 组合多个被观察者
         * 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
         * concat()
         * concatArray()
         * 
         * 实例:从内存以及磁盘和网络获取缓存
         */
    
        String memoryCache = null;
        String diskCache = "磁盘缓存数据";
    
        private void concat() {
    
        
            Observable.concat(
                    Observable.create(emitter -> {
    
                        //判断内存是否含有缓存
                        if (null == memoryCache) {
                            emitter.onComplete();
                        } else {
                            emitter.onNext(memoryCache);
                        }
                    }),
                    Observable.create(emitter -> {
    
                        //判断磁盘
                        if (null == diskCache) {
                            emitter.onComplete();
                        } else {
                            emitter.onNext(diskCache);
                        }
                    }),
                    Observable.create((ObservableOnSubscribe<String>) emitter -> {
    
                        emitter.onNext("从网络获取缓存数据");
                    })
                    //通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
            ).firstElement()
                    // 即本例的逻辑为:
                    // a. firstElement()取出第1个事件 = memory,即先判断内存缓存中有无数据缓存;由于memoryCache = null,即内存缓存中无数据,所以发送结束事件(视为无效事件)
                    // b. firstElement()继续取出第2个事件 = disk,即判断磁盘缓存中有无数据缓存:由于diskCache ≠ null,即磁盘缓存中有数据,所以发送Next事件(有效事件)
                    // c. 即firstElement()已发出第1个有效事件(disk事件),所以停止判断。
    
                    .subscribe(s -> {
    
                        KLog.d(TTAG, "缓存获得路径是:" + s.toString());
                    });
        }
    

    combineLatest()实例

    进行多个输入框判断,有一个为空时按钮不可点击,都不为空时才可以点击(并且改变输入框颜色)

     /**
         * 通过combineLatest()合并事件 & 联合判断
         * <p>
         * 当两个Observables中的任何一个发送了数据后,
         * 将先发送了数据的Observables 的最新(最后)一个数据 与
         * 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
         */
        private void init() {
    
            nameObser = RxTextView.textChanges(name).skip(1);
            ageObser = RxTextView.textChanges(age).skip(1);
            jobObser = RxTextView.textChanges(job).skip(1);
    
            Observable.combineLatest(nameObser, ageObser, jobObser,
                    (charSequence, charSequence2, charSequence3) -> {
                        boolean nameIsNOtEmpty = !TextUtils.isEmpty(name.getText());
    
                       // boolean nameIs = !TextUtils.isEmpty(name.getText()) && name.getText().toString().length() <= 10;
                        boolean ageIsNotEmpty = !TextUtils.isEmpty(age.getText());
                        boolean jobIsNotEmpty = !TextUtils.isEmpty(job.getText());
    
                        return nameIsNOtEmpty && ageIsNotEmpty && jobIsNotEmpty;
                    }
            ).subscribe(aBoolean -> {
    
                KLog.d(TTAG, "点击结果是:" + aBoolean);
                push.setEnabled(aBoolean);
            });
        }
    

    有条件的轮询操作:

    使用关键字:repeatWhen

    // 设置变量 = 模拟轮询服务器次数
        private int i = 0 ;
        /**
         * 有条件的轮询
         * 使用操作符:repeatWhen
         */
        private void init3() {
    
    
            RetrofitApi retrofitApi = OkHttpUtils.newInstance().create(RetrofitApi.class);
    
            retrofitApi.getCall()
                    .repeatWhen(objectObservable -> {
    
                        // 将原始 Observable 停止发送事件的标识(Complete() /  Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
                        // 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
                        // 此处有2种情况:
                        // 1. 若返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
                        // 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
    
                        return objectObservable.flatMap((Function<Object, ObservableSource<?>>) o -> {
    
                            // 加入判断条件:当轮询次数 = 5次后,就停止轮询
                            if (i>3){
                                return Observable.error(new Throwable("轮询结束"));
                            }
                            // 若轮询次数<4次,则发送1Next事件以继续轮询
                            // 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
    
                            return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
                        });
                    }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Translation>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Translation translation) {
    
                            translation.show();
                            i++;
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    
        }
    
    

    有条件的网络请求出错,重试,可以设置条件

     /**
         * 请求出错去重复查询,可以设置条件
         * 使用操作符:retryWhen
         * 发送网络请求 & 通过retryWhen()进行重试
         * 主要异常才会回调retryWhen()进行重试
         *   参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
         */
    
        // 设置变量
        // 可重试次数
        private int maxConnectCount = 10;
        // 当前已重试次数
        private int currentRetryCount = 0;
        // 重试等待时间
        private int waitRetryTime = 0;
    
        private void init4() {
    
            retrofitApi.getCall()
                    .retryWhen(throwableObservable ->
    
                            throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
    
                                if (throwable instanceof IOException) {
    
                                    if (currentRetryCount < maxConnectCount) {
                                        currentRetryCount++;
                                        waitRetryTime = 1000 + currentRetryCount * 1000;
    
                                        return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                                    } else {
    
                                        return Observable.error(new Throwable("超过重试次数:" + currentRetryCount));
                                    }
                                } else {
                                    return Observable.error(new Throwable("发生异常,非网络"));
                                }
                            }))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Translation>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }
    
                        @Override
                        public void onNext(Translation translation) {
                            translation.show();
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    

    有关过滤操作符

    ofType

      /**
         * 过滤操作符
         */
        private void useOfType() {
    
            Observable.just(1, "asd", 2, 3, 4, "qwe")
                    .ofType(Integer.class)
                    .subscribe(integer -> {
    
                        KLog.d(TTAG, "获得的整型消息事件是:" + integer);
    
                    });
        }
    

    Skip,,,SkipLast

      /**
         * 跳转开头和跳过结尾消息
         */
        private void userSkipAndSkipLast() {
    
            // 使用1:根据顺序跳过数据项
            Observable.just(1, 2, 3, 4, 5)
                    .skip(1) // 跳过正序的前1项
                    .skipLast(2) // 跳过正序的后2项
                    .subscribe(integer -> KLog.d(TTAG, "获取到的整型事件元素是: " + integer));
    
    // 使用2:根据时间跳过数据项
            // 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
            Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
                    .skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
                    .skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
                    .subscribe(along -> KLog.d(TTAG, "获取到的整型事件元素是: " + along));
        }
    
    

    throttleFirst(),,throttleLast()

    在某段时间内,只发送该段时间内第1次事件 / 最后1次事件

    <<- 在某段时间内,只发送该段时间内第1次事件 ->>
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    // 隔段事件发送时间
                    e.onNext(1);
                    Thread.sleep(500);
                    
                    e.onNext(2);
                    Thread.sleep(400);
                    
                    e.onNext(3);
                    Thread.sleep(300);
                   
                    Thread.sleep(300);
                    e.onComplete();
                }
            }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用数据
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    
    
    <<- 在某段时间内,只发送该段时间内最后1次事件 ->>
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    // 隔段事件发送时间
                    e.onNext(1);
                    Thread.sleep(500);
    
                    e.onNext(2);
                    Thread.sleep(400);
    
                    Thread.sleep(300);
                    e.onComplete();
                }
            }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用数据
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            KLog.d(TTAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            KLog.d(TTAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            KLog.d(TTAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                           K Log.d(TTAG, "对Complete事件作出响应");
                        }
                    });
    

    实际应用:规定时间内,多次点击按钮禁止多次操作使用throttleFirst,操作符

            RxView.clicks(button)
                    .throttleFirst(2, TimeUnit.SECONDS)  // 才发送 2s内第1次点击按钮的事件
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {  
                        }
                        @Override
                        public void onNext(Object value) {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    

    Sample()实例应用实时搜索

    在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似

    throttleWithTimeout () / debounce()

    发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据

            RxTextView.textChanges(ed)
                    .debounce(1, TimeUnit.SECONDS)
                    .skip(1) //跳过 第1次请求 = 初始输入框的空字符状态
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<CharSequence>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }
                        @Override
                        public void onNext(CharSequence charSequence) {
                            tv.setText("发送给服务器的字符 = " + charSequence.toString());
                        }
                        @Override
                        public void onError(Throwable e) {
                         
                        }
                        @Override
                        public void onComplete() {
                          }
                    });
    

    firstElement() ,, lastElement()

    仅选取第1个元素 ,,最后一个元素

    // 获取第1个元素
            Observable.just(1, 2, 3, 4, 5)
                      .firstElement()
                      .subscribe(new Consumer<Integer>() {
                          @Override
                          public void accept( Integer integer) throws Exception {
                              KLog.d(TTAG,"获取到的第一个事件是: "+ integer);
                          }
            });
    
    // 获取最后1个元素
            Observable.just(1, 2, 3, 4, 5)
                    .lastElement()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept( Integer integer) throws Exception {
                            KLog.d(TTAG,"获取到的最后1个事件是: "+ integer);
                        }
                    });
    

    elementAt()

    指定接收某个消息,根据索引,可以设置默认消息

      private void userEleMentAt() {
            // 使用1:获取位置索引 = 2的 元素
            // 位置索引从0开始
            Observable.just(1, 2, 3, 4, 5)
                    .elementAt(2)
                    .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
    
    // 使用2:获取的位置索引 > 发送事件序列长度时,设置默认参数
            Observable.just(1, 2, 3, 4, 5)
                    .elementAt(6,10)
                    .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
        }
    
    

    elementAtOrError()

    在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常

     private void userElementAtOrError() {
            Observable.just(1, 2, 3, 4, 5)
                    .elementAtOrError(6)
                    .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
    
        }
    

    相关文章

      网友评论

          本文标题:Rxjava2.0笔记-004-合并,过滤操作符实际应用

          本文链接:https://www.haomeiwen.com/subject/azyznxtx.html