美文网首页我爱编程
RxJava 算术和聚合操作符

RxJava 算术和聚合操作符

作者: 三流之路 | 来源:发表于2018-06-09 22:16 被阅读0次

    ReactiveX 系列文章目录


    concat/concatDelayError/concatArray/concatArrayDelayError

    将多个被观察者按先后顺序串联起来。

    // 前面有 2-4 个 ObservableSource 参数,内部调用的都是 concatArray
    public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
    
    // 数组和集合,意思一样
    public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)            
    public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
    
    Observable.concat(observableStr, observableInt, observableBoolean)
               .subscribe(Consumer<Any> {
                   textView.text = "${textView.text}\nonNext $it"
               })
    

    将依次连续发送 observableStr,observableInt,observableBoolean 里的 12 个数据。


    // 区别在于有个参数 prefetch,和 combineLatest 的 buffersize 作用一样
    public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
    public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch)
    

    这两个重载方法,不知道怎么用,试验了若干次,终于不崩溃了。

    Java 语言版本:

    Observable observable = Observable.create(new ObservableOnSubscribe<ObservableSource>() {
            @Override
            public void subscribe(ObservableEmitter emitter) {
                emitter.onNext(observableInt);
                emitter.onNext(observableStr);
            }
        });
    
    Observable.concat(observable).subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) {
            Log.e("RX", o.toString());
        }
    });
    

    Kotlin 语言版本

    val observable = Observable.create<ObservableSource<*>> { emitter ->
        emitter.onNext(observableInt)
        emitter.onNext(observableStr)
    }
    
    Observable.concat<Any>(observable)
            .subscribe({ o -> Log.e("RX", o!!.toString()) })
    

    最终打出的日志按顺序是 1,2,3,4,5,a,b,c

    concatDelayError 和 concatArrayDelayError 是推迟发射 onError。

    concatEager/concatArrayEager

    public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)
    public static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources)
    
    // 集合和上面的可变参数可以看成一样的
    public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources)
    public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)
    
    public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
    public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)
    

    注释:

    Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
    source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
    in order, each one after the previous one completes.

    意思大概是说,一旦有观察者订阅了之后,会先将被观察者发射的数据缓存起来,然后将缓存的数据一个接一个的发射出去。

    对于外部调用来说,结果和 concat 作用没什么区别。它是并发处理多个 Observable,而不像 concat 那样串行处理,但是又能够保证最终的顺序。

    Observable.concatArrayEager(observableStr, observableInt, observableBoolean).subscribe({
        Log.e("RX", it.toString())
    })
    

    依次发射三个 Observable 的数据。


    有重载方法的参数是 ObservableSource<? extends ObservableSource<? extends T>> sources 应该和 concat 里用这个参数的重载方法差不多,就不写代码测试了。

    concatWith

    public final Observable<T> concatWith(ObservableSource<? extends T> other) {
        ObjectHelper.requireNonNull(other, "other is null");
        return concat(this, other);
    }
    

    非静态,自己和别人 concat。

    collect/collectInto

    收集发射的数据到一个数据结构里,然后将这个结构作为一个整体发射出去。

    Observable.just(18, "China", "Ma")
        .collect(Callable<MutableList<Any>> {
            arrayListOf()
        }, BiConsumer<MutableList<Any>, Any> {
            t1, t2 ->  t1.add(t2)
        }).subscribe(Consumer<MutableList<Any>> {
            Log.e("RX", "$it")
        })
    

    将三个零散的数据收集到一个列表里,最后收到 [18, China, Ma]

    collectInto 是将数据结构直接作为第一个参数传进去,而不需要通过回调提供一个数据结构。

    Observable.just(18, "China", "Ma")
        .collectInto(arrayListOf()
        , BiConsumer<MutableList<Any>, Any> {
            t1, t2 ->  t1.add(t2)
        }).subscribe(Consumer<MutableList<Any>> {
            Log.e("RX", "$it")
        })
    

    count

    返回发射的数目,并且将这个数目作为一个 64 位的 long 型值以 Single 发射出来。

    Observable.just(20,30,40).count()
            .subscribe(object : SingleObserver<Long> {
                override fun onSuccess(t: Long) {
                    textView.text = "${textView.text}\n $t"
                }
    
                override fun onSubscribe(d: Disposable) {
                }
    
                override fun onError(e: Throwable) {
                }
    })
    

    reduce/reduceWith

    // 返回 Maybe
    public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
    // 有初始值,返回 Single
    public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
    // 通过一个回调获取初始值
    public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer)
    

    一种累计运算。

    // 算乘积,结果是 6
    Observable.just(1, 2, 3)
      .reduce({ t1, t2 -> t1 * t2 }).subscribe {
          textView.text = "${textView.text}\n $it"
      }
    
    // 有初始值 10,结果是 60
    Observable.just(1, 2, 3)
        .reduce(10, { t1, t2 -> t1 * t2 })
        .subscribe(object: SingleObserver<Int> {
            override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
            override fun onSubscribe(d: Disposable) {}
            override fun onError(e: Throwable) {}
        })
    
    // 有初始值 8,结果是 48
    Observable.just(1, 2, 3)
        .reduceWith({ 8 }, { t1, t2 -> t1 * t2 })
        .subscribe(object: SingleObserver<Int> {
            override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
            override fun onSubscribe(d: Disposable) {}
            override fun onError(e: Throwable) {}
        })
    

    相关文章

      网友评论

        本文标题:RxJava 算术和聚合操作符

        本文链接:https://www.haomeiwen.com/subject/yyuxeftx.html