美文网首页
Rxjava2 操作符 - Transforming Obser

Rxjava2 操作符 - Transforming Obser

作者: joker_fu | 来源:发表于2017-10-23 20:56 被阅读151次

    本教程均是基于java的项目:

    Buffer — 周期性收集Obserable产生结果到集合中,并一次性发送它。

        private static void buffer() {
            ArrayList<Integer> list = new ArrayList<>();
            for (int i = 1; i < 11; i++) {
                list.add(i);
            }
            Observable
                    .fromIterable(list)
                    .buffer(2, 3) //一次收集2个,下次跳过3个收集
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integers) throws Exception {
                            for (Integer integer : integers) {
                                System.out.println("accept: " + integer);
                            }
                        }
                    });
        }
    

    输出结果:

    accept: 1
    accept: 2
    accept: 4
    accept: 5
    accept: 7
    accept: 8
    accept: 10

    FlatMap — 可以应用一个函数把Observable事件转换到Observables,然后再通过一个Obserable发射出去,需要注意flatMap 并不能保证事件的顺序。

        private static void flatMap() {
            Observable
                    .just(1, 2, 3, 4, 5)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .flatMap(new Function<Integer, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                            System.out.println(Thread.currentThread() + " apply " + integer);
                            return Observable.just("this is " + integer).delay(3, TimeUnit.SECONDS);
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(Thread.currentThread() + " accept: " + s);
                        }
                    });
            while (true) ;
        }
    

    输出结果:

    Thread[RxCachedThreadScheduler-2,5,main] apply 1
    Thread[RxCachedThreadScheduler-2,5,main] apply 2
    Thread[RxCachedThreadScheduler-2,5,main] apply 3
    Thread[RxCachedThreadScheduler-2,5,main] apply 4
    Thread[RxCachedThreadScheduler-2,5,main] apply 5
    Thread[RxComputationThreadPool-2,5,main] accept: this is 2
    Thread[RxComputationThreadPool-2,5,main] accept: this is 3
    Thread[RxComputationThreadPool-4,5,main] accept: this is 4
    Thread[RxComputationThreadPool-1,5,main] accept: this is 1
    Thread[RxComputationThreadPool-1,5,main] accept: this is 5

    可以看到我们apply的时候是1 2 3 4 5,订阅收到的确是2 3 4 1 5,可能下一次运行又不是这个顺序了,需要保证顺序则可以使用 concatMap替换flatMap。

    Map — 通过一个函数(apply)转换通过Observable发射的项目。

        private static void map() {
            Observable
                    .just(1, 2, 3, 4, 5)
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(@NonNull Integer integer) throws Exception {
                            return "result " + integer;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("accept " + s);
                        }
                    });
        }
    

    输出结果:

    accept result 1
    accept result 2
    accept result 3
    accept result 4
    accept result 5

    GroupBy — 对源Observable结果分组,转换成GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值。
    由于GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。所以你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

        private static void groupBy() {
            Observable
                    .range(0, 9)
                    .groupBy(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer value) throws Exception {
                            return value % 3;   //返回值决定组名 这里分了0 1 2三组
                        }
                    })
                    .subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
                        @Override
                        public void accept(final GroupedObservable<Integer, Integer> res) throws Exception {
                            res.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer aLong) throws Exception {
                                    System.out.println("group: " + res.getKey() + " - " + aLong);
                                }
                            });
                        }
                    });
        }
    

    输出结果:

    group: 0 - 0
    group: 1 - 1
    group: 2 - 2
    group: 0 - 3
    group: 1 - 4
    group: 2 - 5
    group: 0 - 6
    group: 1 - 7
    group: 2 - 8

    Scan — scan对迭代源Observable产生的结果应用一个函数,将结果发射出去并作为下次迭代的一个参数。

        private static void scan() {
            Observable
                    .just(1, 2, 3, 4, 5)
                    .scan(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer sum, @NonNull Integer item) throws Exception {
                            System.out.println("sum " + sum);
                            return sum + item;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Next: " + integer);
                        }
                    });
        }
    

    输出结果:

    Next: 1
    sum 1
    Next: 3
    sum 3
    Next: 6
    sum 6
    Next: 10
    sum 10
    Next: 15

    Window — 类似于buffer,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理

        private static void window() {
            ArrayList<Integer> list = new ArrayList<>();
            for (int i = 1; i < 11; i++) {
                list.add(i);
            }
            Observable
                    .fromIterable(list)
                    .window(2, 3)
                    .subscribe(new Consumer<Observable<Integer>>() {
                        @Override
                        public void accept(Observable<Integer> ob) throws Exception {
                            ob.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    System.out.println("accept: " + integer);
                                }
                            });
                        }
                    });
    
        }
    

    输出结果:

    accept: 1
    accept: 2
    accept: 4
    accept: 5
    accept: 7
    accept: 8
    accept: 10

    相关文章

      网友评论

          本文标题:Rxjava2 操作符 - Transforming Obser

          本文链接:https://www.haomeiwen.com/subject/oycnuxtx.html