美文网首页RxJava
RxJava<第十一篇>:变换操作符

RxJava<第十一篇>:变换操作符

作者: NoBugException | 来源:发表于2019-03-15 01:34 被阅读3次

    (1)map

    从发射数据到接收数据之间的数据变换。

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "我是变换过后的" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    以上代码的意思是,发射的数据是Integer类型的, 将Integer类型的数据经过一些处理,最后返回值是String类型。

    如果以上的说法不能理解,那么就举个例子:

        new Thread(new Runnable() {
            @Override
            public void run() {
                toUp("live");
            }
        }).start();
    
    private String toUp(String str){
        String s = str.toUpperCase();
        return s;
    }
    

    以上的列子很简单, 那么怎么写才能让代码更加优雅呢?

        Observable.just("live").map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s.toUpperCase();
            }
        }).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    (2)flatMap

    FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

        List<Integer> list = Arrays.asList(1, 2, 3);
    
        Observable.fromIterable(list).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(final Integer integer) throws Exception {
                Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    
                        emitter.onNext("第" + integer + "个主任务的第1个分支任务");
                        emitter.onNext("第" + integer + "个主任务的第2个分支任务");
                        emitter.onComplete();
                    }
                }).subscribeOn(Schedulers.newThread());
                return observable;
            }
        }).subscribe(new Consumer<String>() {
    
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s+"---thread:"+ Thread.currentThread().getName());
            }
        });
    

    以上代码将一个任务分成两个任务分别发射。

    执行结果如下:

    图片.png

    如果删除subscribeOn(Schedulers.newThread())代码,那么执行结果是:

    图片.png

    显然, 在多线程的情况下,接收数据时,是线程不安全的,如果需要线程安全,那么需要使用ConcatMap。

    如果以上不怎么理解,那么就举一个例子吧,请问大家有没有遇到过嵌套网络请求,当post请求成功返回数据时,这时我们需要这些数据发起新的(一个或多个)post请求。

    需要注意的是,如果第一个post请求成功返回时,第二个和第三个post请求都需要第一个post请求返回的参数时,那么是否考虑线程安全来决定到底使用flatMap还是ConcatMap。

    我们先用一般的代码模拟一下网络请求, 如下:

    public interface INR{
        void success(int result);
        void failed();
    }
    
    //模拟网络请求1
    private void networkRequest1(final String s1, final INR inr){
        new Thread(new Runnable() {
            @Override
            public void run() {
                if("1".equals(s1)){
                    int result = Integer.parseInt(s1);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        inr.failed();
                    }
                    inr.success(result);
                } else{
                    inr.failed();
                }
            }
        }).start();
    }
    
    //模拟网络请求2
    private void networkRequest2(final String s2, final INR inr){
        new Thread(new Runnable() {
            @Override
            public void run() {
                if(s2.equals("2")){
                    int result = Integer.parseInt(s2);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        inr.failed();
                    }
                    inr.success(result);
                } else{
                    inr.failed();
                }
            }
        }).start();
    }
    
    //模拟网络请求3
    private void networkRequest3(final String s3, final INR inr){
        new Thread(new Runnable() {
            @Override
            public void run() {
                if("3".equals(s3)){
                    int result = Integer.parseInt(s3);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        inr.failed();
                    }
                    inr.success(result);
                } else{
                    inr.failed();
                }
            }
        }).start();
    }
    
        networkRequest1("1", new INR() {
            @Override
            public void success(int result) {
                String newresult = String.valueOf(result + 1);
                networkRequest2(newresult, new INR() {
                    @Override
                    public void success(int result) {
                        String newresult = String.valueOf(result + 1);
                        networkRequest3(newresult, new INR() {
                            @Override
                            public void success(int result) {
                                System.out.println(String.valueOf(result));
                            }
    
                            @Override
                            public void failed() {
    
                            }
                        });
                    }
    
                    @Override
                    public void failed() {
    
                    }
                });
            }
    
            @Override
            public void failed() {
    
            }
        });
    

    一个完整的网络嵌套请求需要写那么多代码,如果可读性很差,代码不优雅,那么Rxjava到底怎么实现呢?

        Observable.just("1").flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                int result = Integer.parseInt(s);
                Thread.sleep(3000);
                return Observable.just(result).map(new Function<Integer, String>() {
    
                    @Override
                    public String apply(Integer result) throws Exception {
                        return String.valueOf(result + 1);
                    }
                });
            }
        }).flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                int result = Integer.parseInt(s);
                Thread.sleep(3000);
                return Observable.just(result).map(new Function<Integer, String>() {
    
                    @Override
                    public String apply(Integer result) throws Exception {
                        return String.valueOf(result + 1);
                    }
                });
            }
        }).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    以上代码就是Rxjava的写法了,是不是很简洁呢?

    接下来介绍的ConcatMap就不举例了。

    (3)ConcatMap

        List<Integer> list = Arrays.asList(1, 2, 3);
    
        Observable.fromIterable(list).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(final Integer integer) throws Exception {
                Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    
                        emitter.onNext("第" + integer + "个主任务的第1个分支任务");
                        emitter.onNext("第" + integer + "个主任务的第2个分支任务");
    
                        emitter.onComplete();
                    }
                }).subscribeOn(Schedulers.newThread());
                return observable;
            }
        }).subscribe(new Consumer<String>() {
    
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s+"---thread:"+ Thread.currentThread().getName());
            }
        });
    

    执行结果是:

    图片.png

    (4)flatMapIterable

        Observable.create(new ObservableOnSubscribe<String>() {
    
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                e.onNext("B");
            }
        }).flatMapIterable(new Function<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String s) throws Exception {
                List<String> list = Arrays.asList("A", "B", "C");
                return list;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    (5)switchMap

    只发射最近发射的数据,也就是说,如果前一个任务还没完成时就开始了第二个任务,那么前一个任务将被终止。

        Observable.create(new ObservableOnSubscribe<String>() {
    
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                e.onNext("B");
            }
        }).switchMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
    
                return Observable.just(s, "---:"+s).subscribeOn(Schedulers.newThread());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    日志效果如下

    图片.png

    (6)scan

    sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者

        Observable.range(2, 10).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(String.valueOf(integer));
            }
        });
    

    apply回掉第一个参数是上次的结算结果,第二个参数是当此的源observable的输入值

    日志如下:

    图片.png

    (7)groupBy

    将1,2,3,4分组

        Observable.just(1,2,3,4).groupBy(new Function<Integer, Integer>() {
    
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer % 2;
            }
        }).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
            @Override
            public void accept(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integerIntegerGroupedObservable.getKey() + "    "+integer);
                    }
                });
            }
        });
    

    日志如下:

    图片.png

    (8)buffer

    字面上是缓存的意思

    假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据:
    缓存区1:"one", "two", "three"
    缓存区2:"four", "five"

        Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
                .buffer(3)
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Exception {
                        for (String s : strings){
                            System.out.println(s);
                        }
                        System.out.println("---------------------");
                    }
                });
    

    有5个字符串,设置缓存区为3(默认跳3个字符串)

    日志效果如下:

    图片.png

    假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据,设置偏移量为1

    缓存区1:"one", "two", "three"
    缓存区2: "two", "three", "four"
    缓存区3: "three", "four", "five"
    缓存区4:"four", "five"
    缓存区5: "five"

        Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
                .buffer(3,1)
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Exception {
                        for (String s : strings){
                            System.out.println(s);
                        }
                        System.out.println("---------------------");
                    }
                });
    

    打印日志如下:

    图片.png

    (9)window

    设置一个window最多3条数据,将这个window封装成Observable,并将Observable发射出去。

        Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
                .window(3)
                .subscribe(new Consumer<Observable<String>>() {
                    @Override
                    public void accept(Observable<String> stringObservable) throws Exception {
                        System.out.println("-----------------------");
                        stringObservable.subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                });
    

    执行效果如下

    图片.png
        Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
                .window(3,1)
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Observable<String>>() {
                    @Override
                    public void accept(Observable<String> stringObservable) throws Exception {
                        System.out.println("-----------------------");
                        stringObservable.subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                });
    

    效果如下:

    图片.png

    需要说明的是:

    • window(long count) 被分割成的每个window最大数据值
    • window(long count, long skip) count 每个window最大数据值,skip步长
    • window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List

    (10)cast

    cast是转换操作符, 从字面上的意思是说可以实现类型的转换

        Observable.range(1, 5).cast(Integer.class).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer ss) throws Exception {
                System.out.println(ss);
            }
        });
    
        Observable.range(1, 5).cast(Number.class).subscribe(new Consumer<Number>() {
            @Override
            public void accept(Number ss) throws Exception {
                System.out.println(ss);
            }
        });
    

    打印效果如下:

    图片.png

    如果改成String类型

        Observable.range(1, 5).cast(String.class).subscribe(new Consumer<String>() {
            @Override
            public void accept(String ss) throws Exception {
                System.out.println(ss);
            }
        });
    

    打印效果如下:

    图片.png

    这是一个比较有疑问的地方。

    相关文章

      网友评论

        本文标题:RxJava<第十一篇>:变换操作符

        本文链接:https://www.haomeiwen.com/subject/urpcmqtx.html