美文网首页Android实战
RxJava2.0 操作符(8)—— Mathematical

RxJava2.0 操作符(8)—— Mathematical

作者: DoubleThunder | 来源:发表于2017-10-16 23:15 被阅读129次

    本页展示的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。

    8.1 Concat

    不交错的发射两个或多个 Observable 的发射物。

    ConcatConcat

    Concat 操作符连接多个 Observable 的输出,就好像它们是一个 Observable,第一个 Observable 发射的所有数据在第二个 Observable 发射的任何数据前面,以此类推。

    直到前面一个 Observable 终止,Concat 才会订阅额外的一个 Observable。注意:因此,如果你尝试连接一个"热" Observable(这种 Observable 在创建后立即开始发射数据,即使没有订阅者),Concat 将不会看到也不会发射它之前发射的任何数据。

    在 Rxjava2.0 中实现了多种 Concat 的操作符。

    8.1.1 Concat / ConcatWith

    顺序连接多个 Observables,并且严格按照发射顺序,前一个没有发射完,是不能发射后面的。


    concatconcat
    两者等价
    Observable.concat(ob1,ob2);
    ob1.concatWith(ob2)。
    

    示例代码:

    Observable<Integer> ob1 = Observable.just(10,1,11);
    Observable<Integer> ob2 = Observable.just(3, 8);
    Observable<Integer> ob3 = Observable.just(5, 4);
    Observable.concat(ob1, ob2,ob3).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });
    

    输出结果:

    accept:10
    accept:1
    accept:11
    accept:3
    accept:8
    accept:5
    accept:4
    

    8.1.2 concatArray

    连接可变数量的 Observable 源。

    concatArrayconcatArray

    concat 操作符内部其实是调用 ConcatArray 的方法。由于与concat(ObservableSource)的重载冲突,以这种方式命名。

    示例代码:

    Observable<Integer> ob1 = Observable.just(10,1,11);
    Observable<Integer> ob2 = Observable.just(3, 8);
    Observable<Integer> ob3 = Observable.just(5, 4);
    
    Observable.concatArray(ob1, ob2, ob3).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });
    

    输出结果:

    accept:10
    accept:1
    accept:11
    accept:3
    accept:8
    accept:5
    accept:4
    

    8.1.2 concatArrayDelayError / ConcatDelayError

    顺序连接多个 Observables,并且严格按照发射顺序,如果其中有发送 OnError(),此延迟其发送 onError (),直到所有发射结束 ;

    concatArrayDelayErrorconcatArrayDelayError
    两者等价
    Observable.concatDelayError(Observable.fromArray(ob1,ob2,ob3));
    Observable.concatArrayDelayError(ob1, ob2, ob3);
    

    示例代码:

    Observable<Integer> ob1 = Observable.just(1, 11);
    Observable<Integer> ob2 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(3);
            emitter.onError(new Throwable("throwable 2"));
            emitter.onNext(8);
        }
    });
    Observable<Integer> ob3 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(5);
            emitter.onError(new Throwable("throwable 3"));
            emitter.onComplete();
        }
    });
    
    Observable.concatArrayDelayError(ob1, ob2, ob3).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG, "onNext:"+integer);
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "onError:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete");
        }
    });
    

    输出结果:

    onNext:1
    onNext:11
    onNext:3
    onNext:5
    onError:2 exceptions occurred.
    

    8.1.3 ConcatArrayEager / ConcatEager

    将 Observable 序列紧紧连接到单个值流中。该级联一旦订阅,此运营商订阅所有源 Observable。 操作员缓冲这些 Observable 发出的值,然后依次排列它们,每一个都在前一个完成之后。


    ConcatEagerConcatEager
    //两者等价
    Observable.ConcatEager(Observable.fromArray(ob1,ob2,ob3));
    Observable.ConcatArrayEager(ob1, ob2, ob3);
    

    8.2 Reduce

    按顺序对 Observable 发射的每项数据应用一个函数并发射最终的值。

    ReduceReduce

    在 Rxjava2.0 中实现了多种 Reduce 的操作符:

    8.2.1 collect / collectInto

    将源 Observable 发送的项目收集到单个可变数据结构中,并返回发出此结构的 Single。

    collectcollect

    这是一个简化版本的 reduce,不需要在每次通过时返回状态。

    //两者等价
    collectInto("beauty",collector);
    collect(Functions.justCallable("beauty"), collector);
    

    示例代码 1:

    Observable.just(2,7,11).collect(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "beauty";
            }
        }, new BiConsumer<String, Integer>() {
            @Override
            public void accept(String s, Integer i2) throws Exception {
                Log.e(TAG, "i1 = " + s + ",i2 = " + i2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept:" + s);
            }
        });
    

    示例代码 2:

    Observable.just(2,7,11).collectInto("beauty", new BiConsumer<String, Integer>() {
            @Override
            public void accept(String s, Integer i2) throws Exception {
                Log.e(TAG, "i1 = " + s + ",i2 = " + i2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept:" + s);
            }
        });
    

    输出结果:

    i1 = beauty,i2 = 2
    i1 = beauty,i2 = 7
    i1 = beauty,i2 = 11
    accept:beauty
    

    8.2.2 reduce

    reducereduce

    Reduce 操作符对原始 Observable 发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程直到原始 Observable 发射它的最后一项数据并终止,此时 Reduce 返回的 Observable 发射这个函数返回的最终值。

    注意:如果原始 Observable 没有发射任何数据,reduce 抛出异常 IllegalArgumentException。
    

    示例代码 1:

    Observable.just(2,7,11).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer i1, @NonNull Integer i2) throws Exception {
            //i1 为前面几项计算得,i2 为当前发射的数据
                Log.e(TAG, "i1 = " + i1 + ",i2 = " + i2);
                return i1 * i2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });
    

    输出结果:

    i1 = 2,i2 = 7
    i1 = 14,i2 = 11
    accept:154
    

    示例代码 2:

     Observable.just(2, 7, 11).reduce(10, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer i1, @NonNull Integer i2) throws Exception {
            //i1 为前面几项计算得,i2 为当前发射的数据
                Log.e(TAG, "i1 = " + i1 + ",i2 = " + i2);
                return i1 * i2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });
    
    

    输出结果:

    i1 = 10,i2 = 2
    i1 = 20,i2 = 7
    i1 = 140,i2 = 11
    accept:1540
    

    8.2.3 reduceWith

    类似于 reduce(); 但可以聚合不同种类的数据。

    示例代码:

    Observable.just(2,7,11).reduceWith(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "beauty";
            }
        }, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                Log.e(TAG, "sep1:" + s + " ,sep2 = " + integer);
                return s + " - " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept:" + s);
            }
        });
    

    输出结果:

    sep1:beauty ,sep2 = 2
    sep1:beauty - 2 ,sep2 = 7
    sep1:beauty - 2 - 7 ,sep2 = 11
    accept:beauty - 2 - 7 - 11
    

    相关文章

      网友评论

        本文标题:RxJava2.0 操作符(8)—— Mathematical

        本文链接:https://www.haomeiwen.com/subject/nlqruxtx.html