美文网首页
RxJava 操作符(过滤、组合)

RxJava 操作符(过滤、组合)

作者: IT一书生 | 来源:发表于2018-05-30 15:07 被阅读78次

    过滤操作符

    过滤操作符是过滤和选择Observable发射的数据序列,让Observable只返回满足条件的数据。

    • filter
      filter 操作符是对原Observable产生的结果自定义规划进行过滤,只有满足条件才会提交给订阅者
           Observable.just(1, 2, 3, 5)
                    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            // 限制条件
                            return integer > 2;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " filter =" + integer);
                        }
                    });
    
    E/zpan:  filter =3
    E/zpan:  filter =5
    
    • elementAt
      elementAt 操作符用来返回指定位置的数据。
      和它类似的有elementAtOrDefault(int,T),它可以有默认值
          Observable.just(1, 2, 3, 4)
                    .elementAt(3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " element =" + integer);
                        }
                    });
    
    E/zpan:  element =4
    
    • distinct
      distinct 操作符用来去重,其只允许还没有发射过的数据项通过。
      和它类似的还有 distinctUntilChanged操作符,其用来去掉连续重复的数据。
            Observable.just(1, 4, 8, 5, 5, 1, 5, 6, 8)
                    .distinct()
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " distinct=" + integer);
                        }
                    });
    
    E/zpan:  distinct=1
         distinct=4
         distinct=8
         distinct=5
         distinct=6
    
    • skip
      skip 操作符将原Observable发射的数据过滤掉前n项。
    • skipLast
      skipLast 操作符将原Observable发射的数据从后面进行过滤操作
            Observable.just(1, 2, 3, 4, 5)
                    .skip(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " skip = " + integer);
                        }
                    });
    
    E/zpan:  skip = 3
         skip = 4
         skip = 5
    
    • take
      take 操作符将原Observable发射的数据只取前n项
    • takeLast
      takeLast 操作符将原Observable发射的数据从后面进行过滤操作
            Observable.just(1, 2, 3, 4, 5)
                    .take(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " take = " + integer);
                        }
                    });
    
    E/zpan:  take = 1
         take = 2
    
    • ignoreElements
      ignoreElements 操作符忽略所有原Observable产生的结果,只把Observable的OnCompleted和onError事件通知给订阅者
            Observable.just(1, 2, 3, 4)
                    .ignoreElements()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            Log.e("zpan", "==onCompleted=====");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("zpan", "==onError=====");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("zpan", "==onNext=====");
                        }
                    });
    
    E/zpan: ==onCompleted=====
    
    • ofType
      过滤指定类型的数据
            Observable.just(1, 2, "3", "4")
                    .ofType(String.class)
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.e("zpan", "ofType = " + s);
                        }
                    });
    
    E/zpan: ofType = 3
        ofType = 4
    
    • first/firstOrDefault
      只发射满足条件的第一项,后者可以设置默认值。
    • last/lastOrDefault
      只发射满足条件的最后一项,后者可以设置默认值。
            Observable.just(1, 2, 3, 4)
                    .first(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer > 2;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan"," first =" + integer);
                        }
                    });
    
    E/zpan:  first =3
    
    • throttleFirst
      throttleFirst 操作符会定期发射这个时间段里原Observable发射的第一个数据,throttleFirst 操作符默认在computation调度器上执行。与throttleFirst类似的有sample操作符,它会定时的发射源Observable最近发射的数据,其他都会被过滤掉。
           Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 0; i < 10; i++) {
                        subscriber.onNext(i);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        subscriber.onCompleted();
                    }
                }
            })
                    // 每隔100ms发射一个数据,throttleFirst设定的时间是200ms,所以它会发射每个200ms内的第一个数据
                    .throttleFirst(200, TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " throttleFirst=" + integer);
                        }
                    });
    

    组合操作符

    组合操作符可以同时处理多个Observable来创建我们所需要的Observable。

    • startWith
      startWith 操作符会在原Observable发射的数据前面插上一些数据。
            Observable.just(4,5,6)
                    .startWith(1,2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan"," startWith = " + integer);
                        }
                    });
    
    E/zpan:  startWith = 1
         startWith = 2
         startWith = 4
         startWith = 5
         startWith = 6
    
    • merge
      merge 操作符将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错。
    Observable<Integer> o1 = Observable.just(1, 2, 3);
            Observable<String> o2 = Observable.just("7", "8", "9");
            Observable.merge(o1,o2)
                    .subscribe(new Action1<Serializable>() {
                        @Override
                        public void call(Serializable serializable) {
                            Log.e("zpan","merge = " + serializable.toString());     
                        }
                    });
    
    E/zpan: merge = 1
        merge = 2
        merge = 3
        merge = 7
        merge = 8
        merge = 9
    
    • concat
      类似于merge,只是concat严格按照顺序发射数据。
            Observable<Integer> o1 = Observable.just(1, 2);
            Observable<Integer> o2 = Observable.just(6, 7);
            Observable.concat(o1, o2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("zpan", " concat = " + integer);
                        }
                    });
    
    E/zpan:  concat = 1
         concat = 2
         concat = 6
         concat = 7
    
    • zip
      zip操作符合并两个或多个Observable发射出的数据,根据指定的函数变换它们,并发射一个新的值。
            Observable<Integer> o1 = Observable.just(1, 2);
            Observable<String> o2 = Observable.just("- A", "- B");
            Observable.zip(o1, o2, new Func2<Integer, String, Object>() {
                @Override
                public Object call(Integer integer, String s) {
                    return integer + s;
                }
            })
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(Object o) {
                            Log.e("zpan", " zip = " + o);
                        }
                    });
    
    E/zpan:  zip = 1- A
    E/zpan:  zip = 2- B
    
    • CombineLatest
      combineLatest 操作符和zip操作符类似。
      将两个Observale最近发射的数据已经Func2函数的规则进展组合。
            Observable<Integer> o1 = Observable.just(1, 2);
            Observable<String> o2 = Observable.just("- A", "- B");
            Observable.combineLatest(o1, o2, new Func2<Integer, String, Object>() {
                @Override
                public Object call(Integer integer, String s) {
                    return integer + s;
                }
            })
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(Object o) {
                            Log.e("zpan", " combineLatest= " + o);
                        }
                    });
    
    E/zpan:  combineLatest = 2- A
    E/zpan:  combineLatest = 2- B
    
    • join
      结构:
      onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)
      join操作符的效果类似于排列组合,把第一个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配;
            Observable<Integer> o1 = Observable.just(1, 2);
            Observable<String> o2 = Observable.just("- c", "- d");
            o1.join(o2, new Func1<Integer, Observable<Long>>() {
                @Override
                public Observable<Long> call(Integer integer) {
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            }, new Func1<String, Observable<Long>>() {
                @Override
                public Observable<Long> call(String s) {
                    return Observable.timer(0, TimeUnit.SECONDS);
                }
            }, new Func2<Integer, String, Object>() {
                @Override
                public Object call(Integer integer, String s) {
                    return integer + s;
                }
            })
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(Object o) {
                            Log.e("zpan", " join= " + o);
                        }
                    });
    
    E/zpan:  join= 2- c
         join= 1- c
         join= 2- d
         join= 1- d
    

    相关文章

      网友评论

          本文标题:RxJava 操作符(过滤、组合)

          本文链接:https://www.haomeiwen.com/subject/nwdtsftx.html