RxJava操作符(一)

作者: 小白要超神 | 来源:发表于2017-09-25 14:53 被阅读286次

    注:只包含标准包中的操作符,用于个人学习及备忘
    参考博客:http://blog.csdn.net/maplejaw_/article/details/52396175

    本篇将介绍rxjava中的创建操作、合并操作、过滤操作、条件/布尔操作、聚合操作、转换操作以及变换操作,只针对用法不涉及原理,对RxJava不熟悉的可参考:http://gank.io/post/560e15be2dca930e00da1083

    创建操作

    • create:使用OnSubscrib直接创建一个Observable

       Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("item1");
                subscriber.onNext("item2");
                subscriber.onCompleted();
            }
        });
      
    • from:将数组或集合拆分成具体对象后,转换成发送这些对象的Observable

        String[] arr = {"item1", "item2", "item3"};
        Observable.from(arr)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("debug", s);      //调用多次,分别打印出item1,item2,item3
                    }
                });
      
    • just:将一个或多个对象转换成发送这些对象的Obserbable

        Observable.just("item1","item2","item3")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("debug", s);      //调用多次,分别打印出item1,item2,item3
                    }
                });
      
    • empty:创建一个直接通知完成的Observable

    • error:创建一个直接通知错误的Observable

    • never:创建一个什么都不做的Observable

        Observable observable1 = Observable.empty();    //直接调用onCompleted()方法
        Observable observable2 = Observable.error(new RuntimeException());  //直接调用onError()方法
        Observable observable3 = Observable.never();    //onNext(),onCompleted(),onError()均不调用
      
    • timer:创建一个延时发射数据的Observable

        Observable.timer(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //aLong为0
                    }
                });
      
    • interval:创建一个按照给定的时间间隔发射送0开始的整数序列的Obervable

        Observable.interval(2, 1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        //等待2秒后开始发射数据,发射的时间间隔为1秒,从0开始计数
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        //等待1秒后开始发射数据,发射的时间间隔为1秒,从0开始计数
                        //相当于Observable.interval(1, 1, TimeUnit.SECONDS)
                    }
                });
      
    • range:创建一个发射指定范围的整数序列的Observable

        Observable.range(3, 4)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());//依次发射3,4,5,6,从3开始发射4个数据
                    }
                });
      
    • defer:观察者订阅时才创建Observable,每次订阅返回一个新的Observable

        Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.just("s");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("debug", s);      //打印s
            }
        });
      

    合并操作(用于组合多个Observavle)

    • concat:按顺序连接多个Observable,注:Observable.concat(a,b)等价于a.concatWith(b)

        Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.just(4, 5, 6);
      
        Observable.concat(observable1, observable2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,3,4,5,6
                    }
                });
      
    • startWith:在数据序列的开头增加一项数据,内部调用concat

       Observable.just(1, 2, 3)
               .startWith(Observable.just(4, 5))   //添加一个Observable
               .subscribe(new Action1<Integer>() {
                   @Override
                   public void call(Integer integer) {
                       Log.d("debug", integer.toString()); //打印4,5,1,2,3
                   }
               });
      
       Observable.just(1,2,3)
               .startWith(4,5)     //添加多个数据
               .subscribe(new Action1<Integer>() {
                   @Override
                   public void call(Integer integer) {
                       Log.d("debug", integer.toString()); //打印4,5,1,2,3
                   }
               });
      
       List<Integer> integers = new ArrayList<>();
       integers.add(4);
       integers.add(5);
       Observable.just(1,2,3)
               .startWith(integers)    //添加一个集合
               .subscribe(new Action1<Integer>() {
                   @Override
                   public void call(Integer integer) {
                       Log.d("debug", integer.toString()); //打印4,5,1,2,3
                   }
               });
      
    • merge / mergeDelayError:将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知

    merge工作流程
        Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                SystemClock.sleep(1000);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                SystemClock.sleep(500);
                subscriber.onNext(4);
                subscriber.onNext(5);
                SystemClock.sleep(1000);
                subscriber.onNext(6);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable.merge(observable1, observable2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,4,5,2,3,6
                    }
                });
    
    • zip:使用一个函数组合多个Observable发射的数据集合,再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合
    zip工作流程
        Observable<Integer> observable1 = Observable.just(1, 2, 3, 4, 5);
        Observable<String> observable2 = Observable.just("A", "B", "C", "D");
        Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
            @Override
            public String call(Integer integer, String s) {
                return integer + s;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("debug", s);  //打印1A,2B,3C,4D
            }
        });
    
    • combineLatest:当两个Observable中任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合
    combineLatest工作流程
        Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                SystemClock.sleep(500);
                subscriber.onNext(2);
                SystemClock.sleep(1000);
                subscriber.onNext(3);
                SystemClock.sleep(300);
                subscriber.onNext(4);
                SystemClock.sleep(500);
                subscriber.onNext(5);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                SystemClock.sleep(300);
                subscriber.onNext("A");
                SystemClock.sleep(300);
                subscriber.onNext("B");
                SystemClock.sleep(500);
                subscriber.onNext("C");
                subscriber.onNext("D");
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
            @Override
            public String call(Integer integer, String s) {
                return integer + s;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("debug", s);  //打印1A,2A,2B,2C,2D,3D,4D,5D
            }
        });
    

    过滤操作

    • filter:过滤数据

        Observable.just(1, 2, 3, 4)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;    //过滤偶数
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印2,4
                    }
                });
      
    • ofType:过滤指定类型数据

        Observable.just(1, "2", 3, "4")
                .ofType(Integer.class)  //过滤整形数据
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,3
                    }
                });
      
    • take:只发射前n项数据或者一定时间内的数据(无需考虑索引越界问题,配合interval操作符可作为定时器使用)

        Observable.just(1, 2, 3, 4)
                .take(2)    //只发射前2项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .take(3, TimeUnit.SECONDS)  //只发射3秒内的数据
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        //打印0,1(打印出来的并不是相像中的0,1,2,应该与代码代码执行的时间有关,使用时需要注意!)
                        Log.d("debug", aLong.toString());   
                    }
                });
      
    • takeLast:只发射最后的N项数据或者一定时间内的数据(无需考虑索引越界问题)

        Observable.just(1, 2, 3, 4)
                .takeLast(3)    //只发射后3项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印2,3,4
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .take(10)   //每1秒发射一个数据,发射10秒
                .takeLast(3, TimeUnit.SECONDS)  //只发射最后3秒的数据
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印6,7,8,9(同样存在些许误差,使用时需注意!)
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .take(10)
                .takeLast(2, 3, TimeUnit.SECONDS)   //只发射最后3秒内的最后2个数据
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印8,9
                    }
                });
      
    • takeFirst:只发射满足条件的第一项(其实就是filter+take)

        Observable.just(1, 2, 3, 4)
                .takeFirst(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 1;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印2
                    }
                });
      

    *first / firstOrDefault:只发射第一项或者满足条件的第一项数据,其中firstOrDefault可以指定默认值(建议使用firstOrDefault,找不到对应元素时first会报异常)

        Observable.just(1, 2, 3)
                .first()    //发射第一项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1
                    }
                });
    
        Observable.just(1, 2, 3, 4)
                .first(new Func1<Integer, Boolean>() {  //发射大于2的第一项
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 2;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印3
                   }
                });
    
        Integer[] arr = {};
        Observable.from(arr)
                .firstOrDefault(2)  //发射第一项,没有可发射的数据时,发射默认值2
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印2
                    }
                });
    
    • last / lastOrDefault:只发射最后一项或满足条件的最后一项,其中lastOrDefault可以指定默认值

        Observable.just(1, 2, 3)
                .last()    //发射最后一项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印3
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .last(new Func1<Integer, Boolean>() {  //发射大于2的最后一项
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 2;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印4
                   }
                });
      
        Integer[] arr = {};
        Observable.from(arr)
                .lastOrDefault(2)  //发射最后一项,没有可发射的数据时,发射默认值2
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印2
                    }
                });
      
    • skip:跳过开始的n项数据或者一定时间内的数据(与take类似)

        Observable.just(1, 2, 3, 4)
                .skip(2)    //跳过前2项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());     //打印3,4
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .take(5)
                .skip(3, TimeUnit.SECONDS)   //跳过前3秒
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());  //打印2,3,4,同样存在误差!
                    }
                });
      
    • skipLast:跳过最后的n项数据或一定时间内的数据

        Observable.just(1, 2, 3, 4)
                .skipLast(2)    //跳过最后2项
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());     //打印1,2
                    }
                });
      
        Observable.interval(1, TimeUnit.SECONDS)
                .take(7)
                .skipLast(3, TimeUnit.SECONDS)   //跳过最后3秒
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());  //打印0,1,2,同样存在误差!
                    }
                });
      
    • elementAt / elementAtOrDefault:发射某一项数据,其中elementAtOrDefault可以指定索引越界时发射的默认值

        Observable.just(1, 2, 3, 4)
                .elementAt(2)   //发射索引为2的数据
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印3
                    }
                });
      
        Observable.just(1, 2, 3)
                .elementAtOrDefault(4, 5)   //发射索引为4的数据,索引越界时发射默认数据5
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印5
                    }
                });
      
    • ignoreElements:丢弃所有数据,只发射错误或正常终止的通知,即只触发观察者的onError()或onCompleted()方法

    • distinct:过滤重复数据,可指定判定唯一的标准

        Observable.just(1, 1, 2, 3, 2, 4)
                .distinct()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,3,4
                    }
                });
      
        Observable.just(1, 1, 2, 3, 2, 4)
                //根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个
                .distinct(new Func1<Integer, Integer>() {   
                    @Override
                    public Integer call(Integer integer) {
                        //奇数对应的key为1,偶数对应的key为2
                        if (integer % 2 == 0) {
                            return 2;
                        }
                        return 1;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2
                    }
                });
      
    • distinctUntilChanged:过滤掉连续重复的数据,可指定判定唯一的标准

        Observable.just(1, 1, 2, 3, 2, 4)
                .distinctUntilChanged()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,3,2,4
                    }
                });
      
        Observable.just(1, 1, 2, 3, 2, 4)
                //根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个
                .distinctUntilChanged(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        //奇数对应的key为1,偶数对应的key为2
                        if (integer % 2 == 0) {
                            return 2;
                        }
                        return 1;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,3,2
                    }
                });
      
        Observable.just(1, 1, 2, 3, 2, 4)
                //传入比较器的方式
                .distinctUntilChanged(new Func2<Integer, Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer, Integer integer2) {
                        return integer % 2 == integer2 % 2; //同为奇数或偶数返回true
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,3,2
                    }
                });
      
    • throttleFirst:定期发射Observable在该时间段发射的第一项数据

        Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                .take(10)   //每500毫秒发射一次数据,发射10次
                .throttleFirst(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的第一项数据
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        //打印0,2,5,8(即第一秒发射0,1,第二秒发射2,3,4,第三秒发射5,6,7,第四秒发射8,9),同样存在误差!
                        Log.d("debug", aLong.toString());   
                    }
                });
      
    • throttleWithTimeout / debounce(两者使用及效果相同):发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {  //依次发射1-6,发射间隔不同
                subscriber.onNext(1);
                SystemClock.sleep(500);
                subscriber.onNext(2);
                SystemClock.sleep(500);
                subscriber.onNext(3);
                SystemClock.sleep(1000);
                subscriber.onNext(4);
                SystemClock.sleep(1000);
                subscriber.onNext(5);
                SystemClock.sleep(500);
                subscriber.onNext(6);
                subscriber.onCompleted();
            }
        }).throttleWithTimeout(700, TimeUnit.MILLISECONDS)  //指定最小发射间隔时间为700毫秒
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印3,4,6
                    }
                });
      
    • sample / throttleLast(两者使用及效果相同):定期发射Observable在该时间段发射的最后一项数据,与throttleFirst相反

        Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                .take(10)   //每500毫秒发射一次数据,发射10次
                .throttleLast(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的最后一项数据
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        //打印1,3,5,7,9(即第一秒发射0,1,第二秒发射2,3,第三秒发射4,5,第四秒发射6,7,第五秒发射8,9)
                        Log.d("debug", aLong.toString());
                    }
                });
      
    • timeout:如果指定时间内没有发射任何数据,就发射一个异常或者使用备用的Observavle

        Observable.timer(5, TimeUnit.SECONDS)
                .timeout(3, TimeUnit.SECONDS)   //超时则发射异常
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }
      
                    @Override
                    public void onError(Throwable e) {
                        Log.d("debug", "onError()");    //抛出异常
                    }
      
                    @Override
                    public void onNext(Long aLong) {
                        Log.d("debug", aLong.toString());
                    }
                });
      
        Observable.timer(5, TimeUnit.SECONDS)
                .timeout(3, TimeUnit.SECONDS, Observable.just(2L))  //设置备用Observable
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
      
                    }
      
                    @Override
                    public void onError(Throwable e) {
                        Log.d("debug", "onError()");
                    }
      
                    @Override
                    public void onNext(Long aLong) {
                        Log.d("debug", aLong.toString());   //发射备用Observable,打印2
                    }
                });
      

    条件 / 布尔操作

    • all:判断所有数据中是否都满足某个条件

        Observable.just(1, 2, 3, 4)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 5;//所有项都小于5
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 2;//所有项都大于2
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印false
                    }
                });
      
    • exists:判断是否存在数据项满足某个条件

        Observable.just(1, 2, 3, 4)
                .exists(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 2; //存在某项大于2
                    }   
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
    • contains:判断所有数据中是否包含指定的数据(内部调用exists)

        Observable.just(1, 2, 3, 4)
                .contains(2)    //是否包含2
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
    • sequenceEqual:判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)

        Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
    • isEmpty:用于判断Observable是否没有发射任何数据(发射null返回为false)

        Observable.from(new ArrayList<Integer>())  //集合中没有数据
                .isEmpty()
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
        Observable.empty()
                .isEmpty()
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d("debug", aBoolean.toString());    //打印true
                    }
                });
      
    • amber:指定多个Observable,只允许第一个开始发射数据的Observable发射全部数据,其他Observable将会被会忽略

        Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                SystemClock.sleep(500);     //延迟500毫秒
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onCompleted();
            }
        }.subscribeOn(Schedulers.computation()));  //指定为新的线程
      
        Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("a");
                subscriber.onNext("b");
                subscriber.onCompleted();
            }
        });
      
        Observable.amb(observable1, observable2)
                .subscribe(new Action1<Serializable>() {
                    @Override
                    public void call(Serializable serializable) {
                        Log.d("debug", serializable.toString());    //打印a,b
                    }
                });
      
    • switchIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就使用备用的Observable

        Observable.from(new ArrayList<Integer>())
                .switchIfEmpty(Observable.just(1, 2))
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2
                    }
                });
      
    • defaultIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就发射一个默认值(内部调用switchIfEmpty)

        Observable.from(new ArrayList<Integer>())
                .defaultIfEmpty(1)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1
                    }
                });
      
    • takeUntil:当发射的数据满足某个条件后(包含该数据),或者第二个Observable发射了一项数据或发射了一个终止通知时(观察者接受不到第二个Observable发射的数据),终止第一个Observable发送数据

        Observable.just(1, 2, 3, 4)
                .takeUntil(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer == 3;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", "just" + integer.toString());     //打印1,2,3
                    }
                });
      
        Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.computation())
                .takeUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印0,1,2
                    }
                });
      
    • takeWhile:当发射的数据对应某个条件为false时(不包含该数据),Observable终止发送数据

        Observable.just(1, 2, 3, 4)
                .takeWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer != 3;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());     //打印1,2
                    }
                });
      
    • skipUnit:丢弃Observable发射的数据,直到第二个Observable开始发射数据或者发射一个终止通知时

        Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                .take(5)
                .subscribeOn(Schedulers.computation())
                .skipUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印3,4
                    }
                });
      
    • skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立(不丢弃条件数据)

        Observable.just(1, 2, 3, 4)
                .skipWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 3;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());     //打印3,4
                    }
                });
      

    聚合操作

    • reduce:用一个函数接收Observable发射的数据,将函数的计算结果作为下次计算的参数,最后输出结果。

        Observable.just(1, 2, 3, 4)
                .reduce(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
                        return integer + integer2;  //求和操作
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", "result:" + integer);
                    }
                });
        /**
         * 日志输出
         * integer1:1,integer2:2
         * integer1:3,integer2:3
         * integer1:6,integer2:4
         * result:10
         */
      
    • collect:用于将数据收集到一个可变的数据结构(如List,Map)

        Observable.just(1, 2, 3, 4)
                .collect(new Func0<List<Integer>>() {
                    @Override
                    public List<Integer> call() {
                        return new ArrayList<Integer>();    //创建List用于收集数据
                    }
                }, new Action2<List<Integer>, Integer>() {
                    @Override
                    public void call(List<Integer> integers, Integer integer) {
                        integers.add(integer);  //将数据添加到List中
                    }
                })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", integers.toString());    //打印[1, 2, 3, 4]
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .collect(new Func0<Map<Integer, String>>() {
                    @Override
                    public Map<Integer, String> call() {
                        return new HashMap<Integer, String>();  //创建Map用于收集数据
                    }
                }, new Action2<Map<Integer, String>, Integer>() {
                    @Override
                    public void call(Map<Integer, String> integerStringMap, Integer integer) {
                        integerStringMap.put(integer, "value" + integer);   //将数据添加到Map中
                    }
                })
                .subscribe(new Action1<Map<Integer, String>>() {
                    @Override
                    public void call(Map<Integer, String> integerStringMap) {
                        //打印{4=value4, 1=value1, 3=value3, 2=value2},注:HashMap保存的数据是无序的
                        Log.d("debug", integerStringMap.toString());    
                    }
                });
      
    • count / countLong:计算发射的数量,内部调用的是reduce

        Observable.just(1, 2, 3, 4)
                .count()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", "integer:" + integer.toString());    //打印4
                    }
                });
      

    转换操作

    • toList:将Observable发射的所有数据收集到一个列表中,返回这个列表

        Observable.just(1, 2, 3, 4)
                .toList()
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", integers.toString());    //打印[1, 2, 3, 4]
                    }
                });
      
    • toSortedList:将Observable发射的所有数据收集到一个有序列表中,返回这个列表

        Observable.just(3, 2, 5, 4, 1)
                .toSortedList()     //默认升序排序
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", integers.toString());    //打印[1, 2, 3, 4, 5]
                    }
                });
      
        Observable.just(3, 2, 5, 4, 1)
                .toSortedList(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer2 - integer;  //自定义排序规则(倒序)
                    }
                })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", integers.toString());    //打印[5, 4, 3, 2, 1]
                    }
                });
      
    • toMap:将序列数据转换为一个Map,根据数据项生成key和value

        Observable.just(1, 2, 3, 4)
                .toMap(new Func1<Integer, String>() {   //根据数据项生成key,value为原始数据
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer;
                    }
                })
                .subscribe(new Action1<Map<String, Integer>>() {
                    @Override
                    public void call(Map<String, Integer> stringIntegerMap) {
                        Log.d("debug", stringIntegerMap.toString());    //打印{key:4=4, key:2=2, key:1=1, key:3=3}
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .toMap(new Func1<Integer, String>() {   //根据数据项生成key和value
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer;
                    }
                }, new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "value:" + integer;
                    }
                })
                .subscribe(new Action1<Map<String, String>>() {
                    @Override
                    public void call(Map<String, String> stringStringMap) {
                        Log.d("debug", stringStringMap.toString()); //打印{key:4=value:4, key:2=value:2, key:1=value:1, key:3=value:3}
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .toMap(new Func1<Integer, String>() {   //根据数据项生成key和value,创建指定类型的Map
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer;
                    }
                }, new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "value:" + integer;
                    }
                }, new Func0<Map<String, String>>() {
                    @Override
                    public Map<String, String> call() {
                        return new LinkedHashMap<String, String>(); //LinkedHashMap保证存取顺序相同
                    }
                })
                .subscribe(new Action1<Map<String, String>>() {
                    @Override
                    public void call(Map<String, String> stringStringMap) {
                        Log.d("debug", stringStringMap.toString());  //打印{key:1=value:1, key:2=value:2, key:3=value:3, key:4=value:4}
                    }
                });
      
    • toMultiMap:类似toMap,不同的地方在于map的value是一个集合,使一个key可以映射多个value,多用于分组

        Observable.just(1, 2, 1, 4)
                .toMultimap(new Func1<Integer, String>() {   //根据数据项生成key,value为原始数据
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer;
                    }
                })
                .subscribe(new Action1<Map<String, Collection<Integer>>>() {
                    @Override
                    public void call(Map<String, Collection<Integer>> stringCollectionMap) {
                        Log.d("debug", stringCollectionMap.toString()); //打印{key:4=[4], key:2=[2], key:1=[1, 1]}
                    }
                });
      
        Observable.just(1, 2, 1, 4)
                .toMap(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer;
                    }
                })
                .subscribe(new Action1<Map<String, Integer>>() {
                    @Override
                    public void call(Map<String, Integer> stringIntegerMap) {
                        Log.d("debug", stringIntegerMap.toString());    //打印{key:4=4, key:2=2, key:1=1}
                    }
                });
      

    变换操作

    • map:对Observable发射的每一项数据都应用一个函数进行变换

        Observable.just(1, 2, 3, 4)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "item:" + integer;
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("debug", s);  //打印item:1,item:2,item:3,item:4
                    }
                });
      
    • cast:在发射之前强制将Observable发射的所有数据转换为指定类型(父类强转为子类)

        List list = new ArrayList();
        Observable.just(list)
                .cast(ArrayList.class)  //将List强转为ArrayList
                .subscribe(new Action1<ArrayList>() {
                    @Override
                    public void call(ArrayList arrayList) {
      
                    }
                });
      
    • flatMap:将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部使用marge合并(可用于一对多转换或多对多转换,也可用于网络请求的嵌套)

        Observable.just(1, 2, 3)
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(Integer integer) {
                        return Observable.create(new Observable.OnSubscribe<Integer>() {
                            @Override
                            public void call(Subscriber<? super Integer> subscriber) {
                                subscriber.onNext(integer * 10);
                                subscriber.onNext(integer * 100);
                                subscriber.onCompleted();
                            }
                        });
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印10,100,20,200,30,300
                    }
                });
      
    • flatMapIterable:和flatMap作用一样,只不过生成的是Iterable而不是Observable

        Observable.just(1, 2, 3)
                .flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
                    @Override
                    public Iterable<Integer> call(Integer integer) {
                        return Arrays.asList(integer * 10, integer * 100);
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印10,100,20,200,30,300
                    }
                });
      
    • concatMap:类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射

    • switchMap:和flatMap很像,将Observable发射的数据变换为Observables集合,当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable

        Observable.interval(0, 500, TimeUnit.MILLISECONDS)  //每500毫秒发射一次
                .take(4)
                .switchMap(new Func1<Long, Observable<String>>() {
                    @Override
                    public Observable<String> call(Long aLong) {
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                subscriber.onNext(aLong + "A");
                                SystemClock.sleep(800);     //延迟800毫秒
                                subscriber.onNext(aLong + "B");
                                subscriber.onCompleted();
                            }
                        }).subscribeOn(Schedulers.newThread());
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("debug", s);  //打印0A,1A,2A,3A,3B
                    }
                });
      
    • 与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值

        Observable.just(1, 2, 3, 4)
                .scan(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
                        return integer + integer2;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", "result:" + integer);
                    }
                });
        /**
         * 日志输出
         * result:1
         * integer1:1,integer2:2
         * result:3
         * integer1:3,integer2:3
         * result:6
         * integer1:6,integer2:4
         * result:10
         */
      
    • groupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每个Observable发射一组不同的数据(类似于toMultiMap)

        Observable.just(1, 2, 3, 4)
                .groupBy(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {   //根据数据项生成key
                        return integer % 2 == 0 ? "偶数" : "奇数";
                    }
                })
                .subscribe(new Action1<GroupedObservable<String, Integer>>() {
                    @Override
                    public void call(GroupedObservable<String, Integer> o) {
                        o.subscribe(new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                Log.d("debug", o.getKey() + ":" + integer); //打印奇数:1,偶数:2,奇数:3,偶数:4
                            }
                        });
                    }
                });
      
        Observable.just(1, 2, 3, 4)
                .groupBy(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {   //根据数据项生成key
                        return integer % 2 == 0 ? "偶数" : "奇数";
                    }
                }, new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {  //根据数据项生成value
                        return integer * 10;
                    }
                })
                .subscribe(new Action1<GroupedObservable<String, Integer>>() {
                    @Override
                    public void call(GroupedObservable<String, Integer> o) {
                        o.subscribe(new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                Log.d("debug", o.getKey() + ":" + integer); //打印奇数:10,偶数:20,奇数:30,偶数:40
                            }
                        });
                    }
                });
      
    • buffer:定期从Observable收集数据到一个集合,然后将这些数据集合打包发射

        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 1)   //count:表示从当前指针位置开始打包3个数据项到集合中,skip:表示指针向后移1位,
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", "skip" + integers.toString());   //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
                    }
                });
      
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3)  //每3个打包成一个集合,内部就是.buffer(3,3)
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("debug", integers.toString());    //打印[1, 2, 3],[4, 5]
                    }
                });
      
        Observable.interval(0, 100, TimeUnit.MILLISECONDS)
                .take(5)
                .buffer(250, TimeUnit.MILLISECONDS, 2)     //将每250毫秒内发射的数据收集到多个集合中,每个集合最多存放2个数据
                .subscribe(new Action1<List<Long>>() {
                    @Override
                    public void call(List<Long> longs) {
                        //打印[0, 1],[2],[3, 4],[]
                        Log.d("debug", "count:" + longs.toString());
                    }
                });
      
        Observable.interval(0, 100, TimeUnit.MILLISECONDS)
                .take(5)
                .buffer(250, TimeUnit.MILLISECONDS)     //将每250毫秒内发射的数据收集到一个集合中,集合不限制大小
                .subscribe(new Action1<List<Long>>() {
                    @Override
                    public void call(List<Long> longs) {
                        Log.d("debug", longs.toString());   //打印[0, 1, 2],[3, 4]
                    }
                });
      
        Observable.interval(0, 100, TimeUnit.MILLISECONDS)
                .take(5)
                //从指定时间节点开始,将该节点后250毫秒内发射的数据收集的一个集合中,初始节点为0,每发射一次集合,
                //节点的时间增加150毫秒,即下一次收集数据从150毫秒开始,收集150毫秒到400毫秒之间发射的数据
                .buffer(250, 150, TimeUnit.MILLISECONDS)     
                .subscribe(new Action1<List<Long>>() {
                    @Override
                    public void call(List<Long> longs) {
                        Log.d("debug", longs.toString());   //打印[0, 1, 2],[2,3],[4]
                    }
                });
      
    • window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项,类似于buffer,buffer发射的是集合,而window发射的是Observable

        Observable.just(1, 2, 3, 4, 5)
                .window(3, 1)
                .subscribe(new Action1<Observable<Integer>>() {
                    @Override
                    public void call(Observable<Integer> integerObservable) {
                        integerObservable.toList()  //将所有数据搜集成一个集合,便于观察
                                .subscribe(new Action1<List<Integer>>() {
                                    @Override
                                    public void call(List<Integer> integers) {
                                        Log.d("debug", integers.toString());    //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
                                    }
                                });
                    }
                });
      
        Observable.just(1, 2, 3, 4, 5)
                .window(3)  //相当于window(3,3)
                .subscribe(new Action1<Observable<Integer>>() {
                    @Override
                    public void call(Observable<Integer> integerObservable) {
                        integerObservable.toList()  //将所有数据搜集成一个集合,便于观察
                                .subscribe(new Action1<List<Integer>>() {
                                    @Override
                                    public void call(List<Integer> integers) {
                                        Log.d("debug", integers.toString());    //打印[1, 2, 3],[4, 5]
                                    }
                                });
                    }
                });
        //剩下其余重载方法也与buffer基本一样,不重复了
      

    篇幅有限,第一部分介绍到这里

    相关文章

      网友评论

      本文标题:RxJava操作符(一)

      本文链接:https://www.haomeiwen.com/subject/djhwsxtx.html