美文网首页
转换操作符

转换操作符

作者: rkua | 来源:发表于2016-11-22 17:04 被阅读0次
    buffer操作符

    buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。
    一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。
    buffer操作符

    //定义邮件内容
            final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
            //每隔1秒就随机发布一封邮件
            Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    try {
                        if (subscriber.isUnsubscribed()) return;
                        Random random = new Random();
                        while (true) {
                            String mail = mails[random.nextInt(mails.length)];
                            subscriber.onNext(mail);
                            Thread.sleep(1000);
                        }
                    } catch (Exception ex) {
                        subscriber.onError(ex);
                    }
                }
            }).subscribeOn(Schedulers.io());
            //把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
            endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
                @Override
                public void call(List<String> list) {
    
                    System.out.println(String.format("You've got %d new messages!  Here they are!", list.size()));
                    for (int i = 0; i < list.size(); i++)
                        System.out.println("**" + list.get(i).toString());
                }
            });
    

    运行结果如下:
    You’ve got 3 new messages! Here they are!(after 3s)
    **Here is an email!
    **Another email!
    **Another email!
    You’ve got 3 new messages! Here they are!(after 6s)
    **Here is an email!
    **Another email!
    **Here is an email!
    ……


    flatMap操作符

    flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
    flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。
    flatMap操作符在合并Observable结果时,有可能存在交叉的情况

    private Observable<File> listFiles(File f){
            if(f.isDirectory()){
                return Observable.from(f.listFiles()).flatMap(new Func1<File, Observable<File>>() {
                    @Override
                    public Observable<File> call(File file) {
                        return listFiles(f);
                    }
                });
            } else {
                return Observable.just(f);
            }
        }
    
    
        @Override
        public void onClick(View v) {
            Observable.just(getApplicationContext().getExternalCacheDir())
                    .flatMap(new Func1<File, Observable<File>>() {
                        @Override
                        public Observable<File> call(File file) {
                            //参数file是just操作符产生的结果,这里判断file是不是目录文件,如果是目录文件,则递归查找其子文件flatMap操作符神奇的地方在于,返回的结果还是一个Observable,而这个Observable其实是包含多个文件的Observable的,输出应该是ExternalCacheDir下的所有文件
                            return listFiles(file);
                        }
                    })
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            System.out.println(file.getAbsolutePath());
                        }
                    });
    
        }
    
    concatMap操作符

    cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
    与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。

    switchMap操作符

    switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
    与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!

    以下是flatMap、concatMap和switchMap的运行实例对比:

            //flatMap操作符的运行结果
            Observable.just(10, 20, 30).flatMap(new Func1<Integer, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Integer integer) {
                    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                    int delay = 200;
                    if (integer > 10)
                        delay = 180;
    
                    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
                }
            }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.println("flatMap Next:" + integer);
                }
            });
    
            //concatMap操作符的运行结果
            Observable.just(10, 20, 30).concatMap(new Func1<Integer, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Integer integer) {
                    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                    int delay = 200;
                    if (integer > 10)
                        delay = 180;
    
                    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
                }
            }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.println("concatMap Next:" + integer);
                }
            });
    
            //switchMap操作符的运行结果
            Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Integer integer) {
                    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                    int delay = 200;
                    if (integer > 10)
                        delay = 180;
    
                    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
                }
            }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.println("switchMap Next:" + integer);
                }
            });
    

    运行结果如下:
    flatMap Next:20
    flatMap Next:10
    flatMap Next:30
    flatMap Next:15
    flatMap Next:10
    flatMap Next:5

    switchMap Next:30
    switchMap Next:15

    concatMap Next:10
    concatMap Next:5
    concatMap Next:20
    concatMap Next:10
    concatMap Next:30
    concatMap Next:15


    groupBy操作符

    groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。
    值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。
    groupBy操作符的流程图如下

    调用例子如下:

    Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Func1<Long, Long>() {
                @Override
                public Long call(Long value) {
                    //按照key为0,1,2分为3组
                    return value % 3;
                }
            }).subscribe(new Action1<GroupedObservable<Long, Long>>() {
                @Override
                public void call(GroupedObservable<Long, Long> result) {
                    result.subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long value) {
                            System.out.println("key:" + result.getKey() +", value:" + value);
                        }
                    });
                }
            });
    

    运行结果如下:
    key:0, value:0
    key:1, value:1
    key:2, value:2
    key:0, value:3
    key:1, value:4
    key:2, value:5
    key:0, value:6
    key:1, value:7
    key:2, value:8
    key:0, value:9


    map操作符

    map操作符是把源Observable产生的结果,通过映射规则转换成另一个结果集,并提交给订阅者进行处理。

    Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
                @Override
                public Integer call(Integer integer) {
                    //对源Observable产生的结果,都统一乘以3处理
                    return integer*3;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.println("next:" + integer);
                }
            });
    

    运行结果如下:
    next:3
    next:6
    next:9
    next:12
    next:15
    next:18


    cast操作符

    cast操作符类似于map操作符,不同的地方在于map操作符可以通过自定义规则,把一个值A1变成另一个值A2,A1和A2的类型可以一样也可以不一样;而cast操作符主要是做类型转换的,传入参数为类型class,如果源Observable产生的结果不能转成指定的class,则会抛出ClassCastException运行时异常。

            Observable.just(1,2,3,4,5,6).cast(Integer.class).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer value) {
                    System.out.println("next:"+value);
                }
            });
    

    运行结果如下:
    next:1
    next:2
    next:3
    next:4
    next:5
    next:6


    scan操作符

    scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。

    Observable.just(1, 2, 3, 4, 5)
        .scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer sum, Integer item) {
                //参数sum就是上一次的计算结果
                return sum + item;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    

    运行结果如下:
    Next: 1
    Next: 3
    Next: 6
    Next: 10
    Next: 15
    Sequence complete.


    window操作符

    window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

    Observable.interval(1, TimeUnit.SECONDS).take(12)
                    .window(3, TimeUnit.SECONDS)
                    .subscribe(new Action1<Observable<Long>>() {
                        @Override
                        public void call(Observable<Long> observable) {
                            System.out.println("subdivide begin......");
                            observable.subscribe(new Action1<Long>() {
                                @Override
                                public void call(Long aLong) {
                                    System.out.println("Next:" + aLong);
                                }
                            });
                        }
                    });
    

    运行结果如下:
    subdivide begin……
    Next:0
    Next:1
    subdivide begin……
    Next:2
    Next:3
    Next:4
    subdivide begin……
    Next:5
    Next:6
    Next:7
    subdivide begin……
    Next:8
    Next:9
    Next:10
    subdivide begin……
    Next:11


    相关文章

      网友评论

          本文标题:转换操作符

          本文链接:https://www.haomeiwen.com/subject/qatmpttx.html