美文网首页
RxJava学习之转换型操作符

RxJava学习之转换型操作符

作者: 菜鸟_一枚 | 来源:发表于2017-03-09 21:57 被阅读138次

    RxJava学习之转换型操作符

    标签(空格分隔): RX系列


    转换型操作符

    下面展示了可用于Observable发射的数据执行变换操作的各种操作符

    • map()---对序列的每一项都应用一个函数来变换Observable发射的数据序列
    • flatMap()、concatMap()、flatMapIterable()---将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
    • switchMap()---将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
    • scan()---对Observable发射的每一项数据应用一个函数,然后按照顺序依次发射每一个值
    • groupyBy()---将Observable分拆为Observable集合,将原始的Observable发射的数据按照key分组,每一个Observable发射一组不同的数据
    • buffer()---它顶起从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射
    • window()---定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
    • cast()---在发射之前强制将Observable发射的所有哦数据转换为指定数据类型

    map操作符


    对Observable发射的每一项数据应用一个函数,执行变换操作
    Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

    RxJava将这个操作符实现为map函数。这个操作符默认不在任何特定的调度器上执行。

    Map操作符的源码
    /**
         * Returns an Observable that applies a specified function to each item emitted by the source Observable and
         * emits the results of these function applications.
         * <p>
         * ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png)
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
         * </dl>
         * 
         * @param func
         *            a function to apply to each item emitted by the Observable
         * @return an Observable that emits the items from the source Observable, transformed by the specified
         *         function
         * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
         */
        public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return lift(new OperatorMap<T, R>(func));
        }
        
        //可以看到接受一个Func1函数
    /**
     * Represents a function with one argument.
     * @param <T> the first argument type  作为参数类型
     * @param <R> the result type  作为返回参数类型
     */
    public interface Func1<T, R> extends Function {
        R call(T t);
    }
        
    

    map例子

    static List<Student> studentList = new ArrayList<Student>(){
            {
                add(new Student("FAALLDA", 28));
                add(new Student("小弟弟", 23));
                add(new Student("妻子的孜孜", 25));
            }
        };
        
    
        /**
         * Map
         * 
         * 通过使用map中的方法对Observable中发射出来的所有数据进行变换
         * 
         * test1()方法是得到多个Student对象中的name,保存到nameList中
         * 注意:接口Func1包装的是有返回值的方法。
         * 
         */
        private static void test1(){
            List<String> nameList = new ArrayList<>();
            Observable.from(studentList)
            .map(new Func1<Student, String>() {
    
                @Override
                public String call(Student student) {
                    return student.name;
                }
            })
            .subscribe(new Subscriber<String>() {
    
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted nameList.size() = " + nameList.size());     
                }
                
                @Override
                public void onNext(String value) {
                    System.out.println("onSuccess value = " + value);
                    nameList.add(value);
                }
    
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            
        }
        
    

    Map操作符连续使用

    /**
         * Map操作符连续使用
         */
        private static void test(){
            Observable.from(studentList)
            //把student转换成Integer
            .map(new Func1<Student, Integer>() {
                
                @Override
                public Integer call(Student student) {
                    return student.age;
                }
            })
            //将Integer转换成String
            .map(new Func1<Integer, String>() {
    
                @Override
                public String call(Integer t) {
                    return String.valueOf(t+10);
                }
            })
            .subscribe(new Subscriber<String>() {
                
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted ");
                }
                
                @Override
                public void onNext(String value) {
                    System.out.println("onSuccess value = " + value);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            
        }
        
    

    Flatmap操作符

    FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable


    • FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

    • 这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

    注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

    /**
         * FlatMap操作符
         * FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
         */
        private static void test4(){
            List<String> nameList = new ArrayList<>();
            Observable.from(studentList)
            //第一个Student类型使我们的flatMap返回函数call里面的参数类型
            //第二个Entity类型使我们的flatMap中的call的返回值类型
            .flatMap(new Func1<Student, Observable<Entity>>() {
                
                @Override
                public Observable<Entity> call(Student student) {
                    Course course = couseMap.get(student.name);
                    Entity entity = new Entity(course, student);
                    return Observable.just(entity);
                }
            })
            .subscribe(new Subscriber<Entity>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Entity entity) {
                    System.out.println("onSuccess entity = " + entity);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            
        }
    

    concatMap操作符

    还有一个concatMap操作符,它类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。


        /**
         * ConcatMap操作符
         * 类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
         * 
         */
        private static void test5(){
            Observable.from(studentList)
            .concatMap(new Func1<Student, Observable<Course>>() {
    
                @Override
                public Observable<Course> call(Student t) {
                    Course course = couseMap.get(t.name);
                    return Observable.just(course);
                }
            })
            .subscribe(new Subscriber<Course>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Course course) {
                    System.out.println("onSuccess course = " + course);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
        }
    

    concatMap和flatMap的区别

    /**
         * flatMap与ConcatMap操作符比较
         * 区别:
         * 无序:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
         * 有序:ConcatMap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。
         * 
         * 说明:在同步线程中,FlatMap和ConcactMap的执行结果是一样的(结果是有序的),
         *      只有在异步线程中,FlatMap结果可能是无序的,而ConcactMap始终能保持有序的结果。
         * 
         * concatMap与flatMap操作符的比较 
         */
        private static void test(){
            List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
            Observable.from(numbers)
            .flatMap(new Func1<Integer, Observable<Integer>>() {
    
                @Override
                public Observable<Integer> call(Integer t) {
                    return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
                    //return Observable.just(t);
                }
            })
            .subscribe(new Subscriber<Integer>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Integer value) {
                    System.out.println("flatMap onSuccess value = " + value);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            System.out.println("----------------------------");
            Observable.from(numbers)
            .concatMap(new Func1<Integer, Observable<Integer>>() {
                
                @Override
                public Observable<Integer> call(Integer t) {
                    return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
                    //return Observable.just(t);
                }
            })
            .subscribe(new Subscriber<Integer>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Integer value) {
                    System.out.println("concatMap onNext value = " + value);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
        }
        
    

    switchMap操作符

    • 它和flatMap很像,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个
        /**
         * switchMap
         * 解释:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
         * 用法与FlatMap几乎一样,区别是SwitchMap操作符只会发射[emit]最近的Observables。
         * 
         * 当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
         * 
         * 应用场景:http://blog.csdn.net/jdsjlzx/article/details/51730162
         * 
         * 逻辑推演:
         * A --> 取消空的,没有可以取消的
         * B-->  A1被取消
         * C-->  B1被取消
         * D-->  C1被取消
         * E-->  D1被取消
         * 最终输出E1
         */
        private static void test7(){
            Observable.just("A", "B", "C", "D", "E")
            .switchMap(new Func1<String, Observable<String>>() {  
                @Override  
                public Observable<String> call(String s) {  
                    return Observable.just(s+"1").subscribeOn(Schedulers.newThread()); //并发
                    //return Observable.just(s+"1");  
                }  
            })
            .subscribe(new Observer<String>() {  
                @Override  
                public void onCompleted() {  
                    System.out.println("switchMap onCompleted");
                }  
      
                @Override  
                public void onError(Throwable e) {  
                    System.out.println("switchMap onError :" + e);
                }  
      
                @Override  
                public void onNext(String s) {  
                    System.out.println("switchMap Next :" + s);
                }  
            });  
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    

    switchMap与flatmap的区别

    /**
         * switchMap与flatmap的区别
         * 
         * 说明:在同步线程中,switchMap发射[emit]所有的Observables,
         *      在异步线程中,switchMap只会发射[emit]最近的Observables。
         * 
         */
        private static void test8(){
            ExecutorService service = Executors.newFixedThreadPool(10);
            List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
            Observable.from(numbers)
            .flatMap(new Func1<Integer, Observable<Integer>>() {
                
                @Override
                public Observable<Integer> call(Integer t) {
                    return Observable.just(t).subscribeOn(Schedulers.from(service));
                    //return Observable.just(t);
                }
            })
            .subscribe(new Subscriber<Integer>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Integer value) {
                    System.out.println("flatMap onNext value = " + value);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            System.out.println("----------------------------------");
            Observable.from(numbers)
            .switchMap(new Func1<Integer, Observable<Integer>>() {
                
                @Override
                public Observable<Integer> call(Integer t) {
                    return Observable.just(t).subscribeOn(Schedulers.from(service));
                    //return Observable.just(t);
                }
            })
            .subscribe(new Subscriber<Integer>() {
                
                @Override
                public void onCompleted() {
                }
                
                @Override
                public void onNext(Integer value) {
                    System.out.println("switchMap2 onNext value = " + value);
                }
                
                @Override
                public void onError(Throwable error) {
                    System.out.println("onError error = " + error);
                }
            });
            service.shutdown();
        }
        
    

    相关文章

      网友评论

          本文标题:RxJava学习之转换型操作符

          本文链接:https://www.haomeiwen.com/subject/vdvggttx.html