美文网首页RxJava
RxJava<第十五篇>:组合/合并操作符

RxJava<第十五篇>:组合/合并操作符

作者: NoBugException | 来源:发表于2019-03-22 17:35 被阅读2次
    (1)concat和concatArray

    组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

        List<Observable<Integer>> list = new ArrayList<>();
        list.add(Observable.just(1,2));
        list.add(Observable.just(3, 4));
        list.add(Observable.just(5, 6));
        Observable.concat(list)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("aaa", String.valueOf(integer));
            }
        });
    
    
        Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("aaa", String.valueOf(integer));
            }
        });
    

    以上两种方式的合并,返回结果是:1 2 3 4 5 6

    (2)merge和mergeArray

    组合多个被观察者一起发送数据,合并后 按时间线并行执行

        List<Observable<Integer>> list = new ArrayList<>();
        list.add(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS));
        list.add(Observable.just(3, 4));
        list.add(Observable.just(5, 6));
        list.add(Observable.just(7, 8));
        Observable
                .merge(list)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("aaa", String.valueOf(integer));
                    }
                });
    
    
        Observable
                .mergeArray(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("aaa", String.valueOf(integer));
                    }
                });
    

    以上两种方法的返回结果是:3 4 5 6 7 8 1 2

    (3)concatDelayError和mergeDelayError

    concatDelayError:多个Observable合并,并按顺序发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。
    mergeDelayError:多个Observable合并,并行发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。

        List<Observable<Integer>> list = new ArrayList<>();
        list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("exception"));
                e.onComplete();
            }
        }).delay(2000, TimeUnit.MILLISECONDS));
    
        list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(3);
                e.onNext(4);
                e.onError(new NullPointerException("exception"));
                e.onComplete();
            }
        }));
    
        list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(5);
                e.onNext(6);
                e.onError(new NullPointerException("exception"));
                e.onComplete();
            }
        }));
    
        Observable
                .concatDelayError(list)
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("aaa", String.valueOf(integer));
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d("aaa", "发生了异常");
            }
        });
    

    返回结果是:

    图片.png
    (4)zip

    合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。(数据和数据之间是一对一的关系)

        Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                SystemClock.sleep(1000);
                e.onNext(2);
                SystemClock.sleep(1000);
                e.onNext(3);
                SystemClock.sleep(1000);
                e.onNext(4);
                SystemClock.sleep(1000);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
    
        Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                SystemClock.sleep(1000);
                e.onNext("B");
                SystemClock.sleep(1000);
                e.onNext("C");
                SystemClock.sleep(1000);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
    
        Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
            @Override
            public String apply(Integer a,String b) throws Exception {
                return a+b;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("aaa", s);
            }
        });
    

    返回结果:

    图片.png
    (5)combineLatest

    按照同一时间线来进行合并。

        Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                SystemClock.sleep(700);
                e.onNext(1);
                SystemClock.sleep(700);
                e.onNext(2);
                SystemClock.sleep(700);
                e.onNext(3);
                SystemClock.sleep(700);
                e.onNext(4);
                SystemClock.sleep(700);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
    
        Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                SystemClock.sleep(600);
                e.onNext("B");
                SystemClock.sleep(600);
                e.onNext("C");
                SystemClock.sleep(600);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
    
        Observable.combineLatest(observable1, observable2, new BiFunction<Integer,String,String>() {
            @Override
            public String apply(Integer a,String b) throws Exception {
                return a+b;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("aaa", s);
            }
        });
    

    接下来,根据代码,我们来画一张图。

    画两个时间线observable1和observable2,根据代码中指定的时间画时间线,最后观察两个被观察者时间线重合的地方。

    图片.png

    实际上代码输出的结果也是:

    图片.png
    (6)combineLatestDelayError

    作用类似于concatDelayError() / mergeDelayError() ,即错误处理,上面已经介绍过类似的了。

    (7)reduce

    把被观察者需要发送的事件聚合成1个事件 & 发送

        Observable.just(1,2,3,4,5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("aaa", String.valueOf(integer));
                    }
                });
    
    (8)collect

    将被观察者Observable发送的数据事件收集到一个数据结构里

        Observable
                .just(1, 2, 3, 4, 5, 6)
                .collect(new Callable<ArrayList<Integer>>() {
    
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> list) throws Exception {
                for (int result : list){
                    Log.d("aaa", String.valueOf(result));
                }
            }
        });
    

    执行结果:

    图片.png
    (9)startWith和startWithArray

    startWith: 在已有数据流之前追加一个或一组数据流。

    图片.png

    startWith可以传递的参数是:一个数据,一个数据列表,一个Observable。

        Observable.just(1, 2, 3)
                .startWith(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("aaa", String.valueOf(integer));
                    }
                });
    

    返回结果: 4 1 2 3

    startWithArray:在已有数据流之前追加一组数据流。

    图片.png
        Observable.just(1, 2, 3)
                .startWithArray(4, 5)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("aaa", String.valueOf(integer));
                    }
                });
    

    返回结果:4 5 1 2 3

    (10)count

    统计被观察者发送事件的数量

        Observable.just(1, 2, 3, 4)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d("aaa", String.valueOf(aLong));
                    }
                });
    

    返回结果:4

    相关文章

      网友评论

        本文标题:RxJava<第十五篇>:组合/合并操作符

        本文链接:https://www.haomeiwen.com/subject/qrnlvqtx.html