美文网首页
笔记:Rxjava(二)操作符

笔记:Rxjava(二)操作符

作者: RoJacKing | 来源:发表于2018-08-23 14:35 被阅读6次

    笔记:RxJava(一)

    一、Single

    顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()。

    使用如下:

    Single.just(new Random().nextInt())
                    .subscribe(new SingleObserver<Integer>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
    
                        }
    
                        @Override
                        public void onSuccess(@NonNull Integer integer) {
                            mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
                            Log.e(TAG, "single : onSuccess : "+integer+"\n" );
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
                            Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
                        }
                    });
    

    二、distinct

    去重操作符,简单的作用就是去重。

    使用如下:

    Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("distinct : " + integer + "\n");
                            Log.e(TAG, "distinct : " + integer + "\n");
                        }
                    });
    
    image.png

    三、debounce

    去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。

    使用如下:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("debounce :" + integer + "\n");
                            Log.e(TAG,"debounce :" + integer + "\n");
                        }
                    });
    
    image.png

    四、defer

    简单地时候就是每次被订阅都会创建一个新的 Observable,如果没有被订阅,就不会产生新的 Observable

    使用如下:

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> call() throws Exception {
                    return Observable.just(1, 2, 3);
                }
            });
    
    
            observable.subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    mRxOperatorsText.append("defer : " + integer + "\n");
                    Log.e(TAG, "defer : " + integer + "\n");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
                    Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
                }
    
                @Override
                public void onComplete() {
                    mRxOperatorsText.append("defer : onComplete\n");
                    Log.e(TAG, "defer : onComplete\n");
                }
            });
    
    image.png

    五、last

    last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

    使用如下:

    Observable.just(1, 2, 3)
                    .last(4)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("last : " + integer + "\n");
                            Log.e(TAG, "last : " + integer + "\n");
                        }
                    });
    
    image.png

    六、merge

    merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

    使用如下

    Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            mRxOperatorsText.append("merge :" + integer + "\n");
                            Log.e(TAG, "accept: merge :" + integer + "\n" );
                        }
                    });
    
    image.png

    七、reduce

    reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

    使用如下:

    Observable.just(1, 2, 3)
                   .reduce(new BiFunction<Integer, Integer, Integer>() {
                       @Override
                       public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                           return integer + integer2;
                       }
                   }).subscribe(new Consumer<Integer>() {
               @Override
               public void accept(@NonNull Integer integer) throws Exception {
                   mRxOperatorsText.append("reduce : " + integer + "\n");
                   Log.e(TAG, "accept: reduce : " + integer + "\n");
               }
           });
    
    image.png

    可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

    八、scan

    scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

    使用如下:

    Observable.just(1, 2, 3)
                    .scan(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                            return integer + integer2;
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mRxOperatorsText.append("scan " + integer + "\n");
                    Log.e(TAG, "accept: scan " + integer + "\n");
                }
            });
    
    image.png image.png

    九、window

    按照实际划分窗口,将数据发送给不同的 Observable

    使用如下:

    mRxOperatorsText.append("window\n");
           Log.e(TAG, "window\n");
           Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
                   .take(15) // 最多接收15个
                   .window(3, TimeUnit.SECONDS)
                   .subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
                   .subscribe(new Consumer<Observable<Long>>() {
                       @Override
                       public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                           mRxOperatorsText.append("Sub Divide begin...\n");
                           Log.e(TAG, "Sub Divide begin...\n");
                           longObservable.subscribeOn(Schedulers.io())
                                   .observeOn(AndroidSchedulers.mainThread())
                                   .subscribe(new Consumer<Long>() {
                                       @Override
                                       public void accept(@NonNull Long aLong) throws Exception {
                                           mRxOperatorsText.append("Next:" + aLong + "\n");
                                           Log.e(TAG, "Next:" + aLong + "\n");
                                       }
                                   });
                       }
                   });
    
    image.png image.png

    相关文章

      网友评论

          本文标题:笔记:Rxjava(二)操作符

          本文链接:https://www.haomeiwen.com/subject/lmuobftx.html