RxJava 学习进行中-Scan&Debounce&a

作者: 郑鸿翊 | 来源:发表于2016-07-06 14:38 被阅读1072次
    RX

    1.Scan

    连续地对数据序列的每一项应用一个函数,然后连续发射结果
    操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator

    看完文档,感觉Scan叫的太牵强,不过自己也想不出别这个更好的词来,当然accumulator更加得形象。它接受的参数可以只是一个固定有两个参数一个输出的Func2<>

            Character[] strs = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h','i'};
            Observable<Character> origin = Observable.from(strs);
    
            origin.scan("",new Func2<String, Character, String>() {
    
                @Override
                public String call(String s, Character character) {
                    String result = s+character.charValue();
                    Log.d("scan","拼接字符 "+s +" + "+character.charValue() +" = " +result);
                    return result;
                }
            }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String result) {
                    Log.d("scan","接收到的 result = "+result);
                }
            });
    

    可以看到Scan还有一个参数,这里是一个空字符串,这是另一种形式,第一个参数是你预先设定好的初值,第二个参数则是你的函数,初值会作为第一个值加入到方法中去。

    注意

    A = C , B随意
        scan("",new Func2<A, B, C>() {
                @Override
                public C call(A a,  B b) {
                    return .....;
                }
            })
    

    这是因为Scan会接受上一个scan返回值作为第一个参数,当然如果你写不一样了,AS会提醒你的,一片红色波浪线...

    运行结果

    2.Debounce & ThrottleWithTimeout

    先贴上官方说明

    仅在过了一段指定的时间还没发射数据时才发射一个数据,会过滤掉发射速率过快的数据项。

    一开我的理解是这样的,在一定时间段内发送在这个时间段内输出的最后一个数据

    如下图

    错误的理解

    于是乎,我写了以下代码做测试

            Observable.interval(1, TimeUnit.SECONDS)//每隔1秒发送一个Long型的从0自增的数
                    .debounce(3, TimeUnit.SECONDS)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            Log.d("debounce","aLong num = "+aLong);
                        }
                    });
    

    然而,打印出来的是一片空白。啥都没有!我懵逼了,跟想象中不一样啊!

    于是乎请教了搜索引擎,找到呼啸而过11写的代码

       Observable.create(new Observable.OnSubscribe<Integer>() {
    
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if(subscriber.isUnsubscribed()) return;
                try {
                    //产生结果的间隔时间分别为100、200、300...900毫秒
                    for (int i = 1; i < 10; i++) {
                        subscriber.onNext(i);
                        Thread.sleep(i * 100);
                    }
                    subscriber.onCompleted();
                }catch(Exception e){
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.newThread())
                .debounce(400, TimeUnit.MILLISECONDS)  //超时时间为400毫秒
                .subscribe(
                        new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                Log.d("debounce","onnext aLong num = "+integer);
                            }
                        }, new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                Log.d("debounce","onerror ");
                            }
                        }, new Action0() {
                            @Override
                            public void call() {
                                Log.d("debounce","oncompleted ");
                            }
                        });
    

    运行结果如下:
    Next:4
    Next:5
    Next:6
    Next:7
    Next:8
    Next:9
    completed!

    恍然大悟,其真正含义是在输出了一个数据后的一段时间内,没有再次输出新的数据,则把这个数据真正的发送出去;假如在这段时间内有新的数据输出,则以这个数据作为将要发送的数据项,并且重置这个时间段,重新计时。

    正确的理解

    我把之前的测试代码按照我的理解修改了一下

    /**debounce设置时间间隔为3秒,interval每一秒发送一个数据项,中间用filter过滤掉了第1、2、3,6、
    7,11、12、13、14秒的数据项,就是为了看第0项,第5项,第10项能否发送,按理解,在3秒的时间间隔
    内没有新数据项到达才会发送最后一个数据项,即0、10项是可以被发送的,而5不可以;y以及使用map打印
    出真正到达debounce的数据项,方便理解
    */
            Observable.interval(1, TimeUnit.SECONDS)//每隔1秒发送一个Long型的从0自增的数
                    .filter(new Func1<Long, Boolean>() {
                        @Override
                        public Boolean call(Long aLong) {
    
                            if(aLong==1) return false;
                            if(aLong==2) return false;
                            if(aLong==3) return false; //发送0
    
                            if(aLong==6) return false;
                            if(aLong==7) return false; //不能发送5
    
                            if(aLong==11) return false;//发送 10
                            if(aLong==12) return false;
                            if(aLong==13) return false;
                            if(aLong==14) return false;
                            return true;
                        }
                    })
                    .map(new Func1<Long, Long>() {
    
                        @Override
                        public Long call(Long aLong) {
                            Log.d("debounce","发送了 "+aLong);
                            return aLong;
                        }
                    })
                    .debounce(3, TimeUnit.SECONDS)
                    .subscribe(new Action1<Long>() {
                                    @Override
                                    public void call(Long aLong) {
                                        Log.d("debounce","aLong num = "+aLong);
                                    }
                                });
    
    
    运行结果

    Debounce有另一种形式,使用一个Func1<?, Observable<?>>的函数来限制发送的数据。

    来自bobo_wang的代码

       Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
                .debounce(new Func1<Integer, Observable<Integer>>() {
                              @Override
                              public Observable<Integer> call(final Integer integer) {
                                  return Observable.create(new Observable.OnSubscribe<Integer>() {
                                      @Override
                                      public void call(Subscriber<? super Integer> subscriber) {
                                          //如果%2==0,则发射数据并调用了onCompleted结束,则不会被丢弃
                                          if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
                                              subscriber.onNext(integer);
                                              subscriber.onCompleted();
                                          }
                                      }
                                  });
                              }
                          }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debounce","integer = "+integer);
                    }
                });
    

    输出为
    debounce:2
    debounce:4
    debounce:6
    debounce:8
    debounce:9

    ThrottleWithTimeout则只有跟使用时间参数来限流的Debounce一样的功能。

    这两个操作符的理解没有那么容易,需要写多几个例子来加深自己的印象。

    相关文章

      网友评论

        本文标题:RxJava 学习进行中-Scan&Debounce&a

        本文链接:https://www.haomeiwen.com/subject/jticjttx.html