呕心沥血:RxJava2.x变换操作符

作者: Burning燃烧 | 来源:发表于2019-12-08 18:49 被阅读0次

    RxJava的基本流程分析可以参考之前的文章
    https://www.jianshu.com/p/2adaea7237c4

    RxJava创建操作符讲解
    https://www.jianshu.com/p/376fb237d93c

    1、序言

    RxJava除了拥有逻辑简洁的事件流链式调用,使用简单外其丰富的操作符基本可以满足日常开发中的各种实现逻辑

    Rx的基本操作符分类


    RxJava操作符.jpg

    下面会逐一讲解RxJava的变换操作符

    2、变换操作符

    RxJava变换操作符分类


    RxJava变换操作符.jpg

    2.1、map

    作用:将输入事件流中的内容逐一的进行修改(Function中定义如何修改)后输出

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    if (!e.isDisposed()) {
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);
                    }
                }
            })
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) throws Exception {
                            return String.valueOf(integer);
                        }
                    })
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
                            LogUtils.showLog("s == " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    结果如下:
    12-05 16:49:41.091 15476-15476/? D/hzfTag1205: s == 1
        s == 2
        s == 3
    

    输入的事件中都是integer类型的,经过Function变换后,将每一个数据修改为String类型的并输出

    2.2、flatMap

    作用:创建一个被观察者,并将事件拆分单独转换后,再合并成为一个新的事件序列。

    Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    if (!e.isDisposed()) {
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);
                    }
                }
            }).flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    final List<String> list = new ArrayList<>();
                    for (int i = 0; i < 3; i++) {
                        list.add("事件 " + integer + ",拆分后的事件 " + i);
                    }
                    return Observable.fromIterable(list);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    LogUtils.showLog("s == " + s);
                }
            });
    
    输出结果:
        s == 事件 1,拆分后的事件 0
        s == 事件 1,拆分后的事件 1
        s == 事件 1,拆分后的事件 2
        s == 事件 2,拆分后的事件 0
        s == 事件 2,拆分后的事件 1
        s == 事件 2,拆分后的事件 2
        s == 事件 3,拆分后的事件 0
        s == 事件 3,拆分后的事件 1
        s == 事件 3,拆分后的事件 2
    

    注:新合并输出的时间序列顺序与旧发送事件的顺序无关;不一定是与旧发送事件序列的顺序相同;如果想要顺序一致,可以用ConcatMap

    Map和FlatMap的区别图解
    map操作符是被观察者单独进行处理并发送,如下图:


    RxJava map操作符.png

    FlatMap操作符:
    为每一个事件创建一个新的Observable被观察者,并将每个原始事件转换后的新事件放入到对应的Observable对象中;将新建的每一个Observable都合并到一个新建的总的Observable中;最后通过这个总的Observable将事件序列发送给观察者Observer


    RxJava FlatMap.png

    2.3、concatMap

    作用:concatMap与flatMap的用法一致;区别在于拆分重新合并生成的事件序列与被观察者生产的事件序列一致
    这里不做过多赘述

    2.4、buffer

    作用:定期从被观察者Observable需要发送的数据中 获取一定数量的事件放到缓存区中,最终发送。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    if (!emitter.isDisposed()){
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                    }
                }
            })
                    .buffer(2,1)
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integers) throws Exception {
                            for (Integer i : integers){
                                LogTestUtils.showLog("i == "+i);
                            }
                        }
                    });
    
    输出结果:
    2019-12-08 18:44:21.660 25128-25128/com.hzf.test D/hzfTag1208: i == 1
    2019-12-08 18:44:21.660 25128-25128/com.hzf.test D/hzfTag1208: i == 2
    2019-12-08 18:44:21.660 25128-25128/com.hzf.test D/hzfTag1208: i == 2
    2019-12-08 18:44:21.660 25128-25128/com.hzf.test D/hzfTag1208: i == 3
    2019-12-08 18:44:21.661 25128-25128/com.hzf.test D/hzfTag1208: i == 3
    2019-12-08 18:44:21.661 25128-25128/com.hzf.test D/hzfTag1208: i == 4
    2019-12-08 18:44:21.661 25128-25128/com.hzf.test D/hzfTag1208: i == 4
    2019-12-08 18:44:21.661 25128-25128/com.hzf.test D/hzfTag1208: i == 5
    

    buffer的参数,buffer(2,1),意味缓冲区的大小为2,步长为1;因此输出就是1,2;2,3;3,4......

    相关文章

      网友评论

        本文标题:呕心沥血:RxJava2.x变换操作符

        本文链接:https://www.haomeiwen.com/subject/yqvdgctx.html