美文网首页
RxJava第二篇,操作符Opreators

RxJava第二篇,操作符Opreators

作者: Man不经心 | 来源:发表于2018-08-10 11:08 被阅读0次

    操作符(Operators):

    其实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物。以便于我们操作数据流。
    按照其作用具体可分为以下几类:

    • 创建:创建一个可观察对象Observable并发射数据
    • 过滤:从Observable发射的数据中取出特定的值
    • 变换:对Observable发射的数据执行变换操作
    • 组合:组合多个Observable,例如:{1,2,3}+{4,5,6}-->{1,2,3,4,5,6}
    • 聚合:聚合多个Observable,例如:{1,2,3}+{4,5,6}-->{[1,4],[2,5],[3,6]}

    创建

    • create:基础创建操作符
    • just:创建一个Observable,可接受一个或多个参数,将每个参数逐一发送
    • romArray:创建一个Observable,接受一个数组,并将数组中的数据逐一发送
    • fromIterable:创建一个Observable,接受一个可迭代对象,并将可迭代对象中的数据逐一发送
    • range:创建一个Observable,发送一个范围内的整数序列
    Observable.just("hello world");//发送一个字符串"hello world"
            Observable.just(1,2,3,4);//逐一发送1,2,3,4这四个整数
    
            Observable.range(0,5)
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    

    过滤(filter与distinct)

    • filter:filter使用Predicate 函数接口传入条件值,来判断Observable发射的每一个值是否满足这个条件,如果满足,则继续向下传递,如果不满足,则过滤掉。
    • distinct:过滤掉重复的数据项,过滤规则为:只允许还没有发射过的数据项通过。
        private static void filterAndDistinct() {
            //filter 过滤
            Observable.range(0,10)
                    .filter(new Predicate<Integer>() {
                        @Override public boolean test(Integer integer) throws Exception {
                            return integer%3==0;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
            //distinct 去重
            Observable.just(1,1,2,3,4,5,6,5,2)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
            //filter && distinct 去重取偶数
            Observable.just(1,1,2,3,4,5,6,5,2)
                    .distinct()
                    .filter(new Predicate<Integer>() {
                        @Override public boolean test(Integer integer) throws Exception {
                            return integer%2==0;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    
        }
    

    变换(map与flatMap)

    • map:对Observable发射的每一项数据应用一个函数,执行变换操作
      map操作符,需要接收一个函数接口Function<T,R>的实例化对象,实现接口内R apply(T t)的方法,在此方法中可以对接收到的数据t进行变换后返回。
    • flatMap:将一个发射数据的Observable变换为多个Observable,然后将多个Observable发射的数据合并到一个Observable中进行发射
        private static void mapAndFlatmap() {
            //map
            Observable.range(0,5)
                    .map(new Function<Integer, String>() {
                        @Override public String apply(Integer integer) throws Exception {
                            return integer+"^2 = " + integer*integer ;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
            //flapmap
            Integer nums1[] = new Integer[]{1,2,3,4};
            Integer nums2[] = new Integer[]{5,6,7};
            Integer nums3[] = new Integer[]{8,9,0};
            Observable.just(nums1,nums2,nums3)
                    .flatMap(new Function<Integer[], ObservableSource<Integer>>() {
                        @Override public ObservableSource<Integer> apply(Integer[] integers)
                                throws Exception {
                            return Observable.fromArray(integers);
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
        }
    

    组合(mergeWith与concatWith)

    • mergeWith:合并多个Observable发射的数据,可能会让Observable发射的数据交错。
    • concatWith:同mergeWith一样,用以合并多个Observable发射的数据,但是concatWith不会让Observable发射的数据交错。
     private static void mergeWithAndConcatWith() {
            Integer nums2[] = new Integer[]{4,5,6,7,8};
            //mergeWith
            Observable.just(1,2,3,4,5)
                    .mergeWith(Observable.fromArray(nums2))
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
            //concatWith
            Observable.just(1,2,3,4,5)
                    .concatWith(Observable.fromArray(nums2))
                    .subscribe(new Consumer<Integer>() {
                        @Override public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    
        }
    

    聚合(zipWith)

    • zipWith:将多个Obversable发射的数据,通过一个函数BiFunction对对应位置的数据处理后放到一个新的Observable中发射,所发射的数据个数与最少的Observabel中的一样多。
        private static void zipWith() {
            String names[] = new String[]{"红","橙","黄","绿","蓝","靛","紫"};
            Observable.just(1,2,3,4,5,6,7,8)
                    .zipWith(Observable.fromArray(names),new BiFunction<Integer,String,String>(){
                        @Override public String apply(Integer integer, String s) throws Exception {
                            return integer+s;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
        }
    

    zipWith需要接收两个参数,
    一个是可观察对象,
    另一个是聚合函数接口BiFunction,这个接口有三个泛型,分别为第一个可观察对象发射的数据类型,第二个可观察对象发射的数据类型,经过聚合函数apply处理后返回的数据类型

    链接使用

        private static void allOperators() {
            Integer nums1[] = new Integer[]{1,3,7,8,9};
            Integer nums2[] = new Integer[]{3,4,5,6};
            String  names[] = new String[]{"红","橙","黄","绿","蓝","靛","紫"};
            Observable.just(nums1)
                    .flatMap(new Function<Integer[], Observable<Integer>>() {
                        @Override public Observable<Integer> apply(Integer[] integers)
                                throws Exception {
                            return Observable.fromArray(integers);
                        }
                    })
                    .mergeWith(Observable.fromArray(nums2))
                    .concatWith(Observable.just(1,2))
                    .distinct()
                    .filter(new Predicate<Integer>() {
                        @Override public boolean test(Integer integer) throws Exception {
                            return integer<5;
                        }
                    })
                    .map(new Function<Integer, String>() {
                        @Override public String apply(Integer integer) throws Exception {
                            return integer+":";
                        }
                    })
                    .zipWith(Observable.fromArray(names), new BiFunction<String, String, String>() {
                        @Override public String apply(String s, String s2) throws Exception {
                            return s+s2;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
        }
    

    git目录
    Rxjava第一篇RxJava第一篇,RxJava入门
    Rxjava第三篇RxJava第三篇,调度器Scheduler

    相关文章

      网友评论

          本文标题:RxJava第二篇,操作符Opreators

          本文链接:https://www.haomeiwen.com/subject/mylkbftx.html