美文网首页
RxJava 2.x下篇(操作符)

RxJava 2.x下篇(操作符)

作者: luoqiang108 | 来源:发表于2018-11-12 19:30 被阅读0次

前言

通过上篇的实例,相信你对RxJava的用处肯定有个基本的了解了。如果还没看过,我强烈建议你先去看上篇,先对整体有个了解,有兴趣之后,带着目的去学枯燥乏味而且多的操作符。

  • RxJava操作符分类 RxJava操作符分类.png

看到这一堆的操作符是不是感觉瞬间就不想往下学了,但是我这里只挑部分我认为比较常用的来讲下,毕竟这么多要全讲的话不仅费时费力,而且效果也不太好,先学会常用的,其它的也是类似的,到时用的再去查询下上手也会很快的。

  • RxJava三部曲

第一步:初始化 Observable
第二步:初始化 Observer
第三步:通过subscribe建立订阅关系

创建操作符

  • 创建被观察者( Observable) 对象 & 发送事件。

1、create()操作符

  • create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象。
  • 被观察者 Observable 也可以称为发射器(上游事件),观察者 Observer 也可以称为接收器(下游事件)。 create操作符.png
  • 页面部分代码和其它一些基础部分就不贴了,仅贴出关键代码和运行效果日志。这里做个说明页面其实很简单就一个按钮,按钮点击之后就去执行贴出的关键部分代码。
  • 按钮点击事件里面的关键部分代码:
        final String TAG = "RxCreateActivity";
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n");
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n");
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext : value : " + integer + "\n");
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n");
            }
        });
  • 运行效果日志:
onSubscribe : false
Observable emit 1
onNext : value : 1
Observable emit 2
onNext : value : 2
onNext : isDisposable : true
Observable emit 3
Observable emit 4
  • 需要注意的几点是:
  • 并且 2.x 中有一个 Disposable 概念,这个东西可以直接调用切断接收器,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。
  • 在发射事件中,我们在发射了数值 3 之后,直接调用了 e.onComplete(),由于在接收器接收了数值 2 之后接收器调用dispose()方法停止接收了,所以本该在接收器中执行的onComplete()方法就不会执行了。
  • 调用e.onComplete()方法之后接收器就不会再接收事件了,但发送事件还是继续的,通过日志能打印出来发射数值3和4我们就可以分析出这一点。如果调用e.onError(Throwable error)效果也是和onComplete()方法类似的,e.onError(Throwable error)对应执行的是接收器中的onError(@NonNull Throwable e)方法。
  • 另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion抛出异常,意味着我们做一些特定操作再也不用 try-catch 了。

2、just()操作符

  • 简单的发射器,快速的创建被观察者对象,依次调用onNext() 方法直接发送传入的事件对象。

注:最多只能发送10个参数

        // 1. 创建时传入字符串型1、2、3,
        // 当然也可以为其它类型,被观察者发射事件对象声明为泛型,但是一般同一个被观察者事件类型都是一致的
        // 在创建后就会发送这些对象,相当于执行了onNext("1")、onNext("2")、onNext("3")、onComplete()
        Observable.just("1", "2", "3")
                // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                .subscribe(
                        // 3. 创建观察者 & 定义响应事件的行为
                        // Consumer是RxJava 2.x提供的用于实现观察者的简便式模式
                        new Consumer<String>() {
                            @Override
                            // 每次接收到Observable的onNext()方法发射的事件都会调用Consumer.accept()
                            public void accept(@NonNull String s) throws Exception {
                                Log.e(TAG, "accept : onNext : " + s + "\n");
                            }
                        });
accept : onNext : 1
accept : onNext : 2
accept : onNext : 3

3、fromArray()操作符

  • 数组数据发射器,直接发送传入的数组元素数据,会将数组中的数据转换为Observable对象
        // 1. 设置需要传入的数组
        Integer[] items = {0, 1, 2, 3, 4};
        // 2. 创建被观察者对象(Observable)时传入数组
        // 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
开始采用subscribe连接
接收到了事件0
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
对Complete事件作出响应

4、fromIterable()操作符

  • 集合数据发射器,直接发送传入的集合List里数据,会将集合中的数据转换为Observable对象,与数组类似,这里就不贴代码了。

5、timer()操作符

  • 相当于一个定时任务,延迟指定时间后,发送1个数值0

注意:默认在新线程,也就是子线程

timer操作符.png
        // 注:timer操作符默认运行在一个新线程上
        // 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
        // 该例子 = 延迟2s后,发送一个long类型数值0
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "开始采用subscribe连接");
                    }
                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }
                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
11-12 14:24:19.586 E: 开始采用subscribe连接
11-12 14:24:21.589 E: 接收到了事件0
11-12 14:24:21.590 E: 对Complete事件作出响应

6、interval()操作符

  • 每隔指定时间就发送事件,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

注意:
默认在新线程,也就是子线程
发送的事件序列 = 从0开始、无限递增1的的整数序列

interval操作符.png
private Disposable mDisposable;

        // 注:interval默认在computation调度器上执行
        // 也可自定义指定线程调度器(第3个参数):interval(long,long,TimeUnit,Scheduler)
        /*参数说明:
          参数1 = 第1次延迟时间;
          参数2 = 间隔时间数字;
          参数3 = 时间单位;*/
        Observable.interval(3, 1, TimeUnit.SECONDS)
                // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                        // 默认最先调用复写的 onSubscribe()
                        Log.e(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
        //使用简便的Consumer观察者,返回值Disposable用于停止任务
        mDisposable = Observable.interval(3, 1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {

            }
        });
11-12 14:33:22.057 E: 开始采用subscribe连接
11-12 14:33:25.060 E: 接收到了事件0
11-12 14:33:26.060 E: 接收到了事件1
11-12 14:33:27.060 E: 接收到了事件2
11-12 14:33:28.060 E: 接收到了事件3
11-12 14:33:29.060 E: 接收到了事件4
...
  • 由于我们这个是间隔执行,所以当我们的Activity 都销毁的时候,实际上这个操作还依然在进行,所以,我们得花点小心思让我们在不需要它的时候干掉它。然而,心细的小伙伴可能会发现我们上面代码里面有一个mDisposable对象,其实这个对象就是我留来停止任务的。
  @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }

7、defer()操作符

  • 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件,这可以确保Observable对象里的数据是最新的。
//1. 第1次对i赋值
private Integer i = 10;

        // 2. 通过defer 定义被观察者对象
        // 注:此时被观察者对象还没创建
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        //2. 第2次对i赋值
        i = 15;
        //3. 观察者开始订阅
        // 注:此时,才会调用defer()创建被观察者对象(Observable)
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到的整数是" + integer);
            }
        });
  • 因为是在订阅时才创建,所以i值会取第2次的赋值
11-12 15:06:38.161 E: 接收到的整数是15

变换操作符

  • 对事件序列中的事件 / 整个事件序列进行加工处理(即变换),使得其转变成不同的事件 / 整个事件序列。

1、map()操作符

  • 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件;即, 将被观察者发送的事件转换为任意的类型事件。


    map操作符.png
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept : " + s + "\n");
            }
        });
  • map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从下面Log日志显而易见。
E: accept : This is result 1
E: accept : This is result 2
E: accept : This is result 3

2、flatMap()操作符

  • 将被观察者发送的事件序列中的每个事件进行拆分,拆分成多个事件序列;之后对拆分出来的事件进行单独转换,转换完成后再将事件无序的合并成一个新的事件序列,最后再进行发送。
        // 采用RxJava基于事件流的链式操作,也就是:对象.XXX()方法
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        })
                // 采用flatMap()变换操作符
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        final List<String> list = new ArrayList<>();
                        for (int i = 0; i < 3; i++) {
                            list.add("我是事件 " + integer + "拆分后的子事件" + i);
                            // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件的事件序列
                            // 最终合并,再发送给观察者
                        }
                        //合并之后发射事件加个延时,不然执行速度太快可能很难出现无序的现象
                        int delayTime = (int) (1 + Math.random() * 10);
                        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, s);
                    }
                });
  • 新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。很可能你的得到的运行得到的结果事件还是3个为一组的出现(也就是下面第一行日志和第四行替换的结果),但是事件不按组出现情况也是存在的,就如下面的日志就是我执行出来不按组出现的运行结果,只是相对来讲会难复现一点。所以你只要记住新合并生成的事件序列是无序就行了。
我是事件 1拆分后的子事件0
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2

3、concatMap()操作符

  • concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为
    concatMap 验证吧。代码就不贴了,直接将上面 flatMap 的关键代码中的flatMap替换为concatMap就好了。下面我们贴下运行结果:
我是事件 1拆分后的子事件0
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2

新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序,可能有时候这样打印出来日志结果还是无序的,这个原因是因为我们加了延时导致日志打印顺序变了导致的,忽略就好了。这里就是记住是有序的就好了。

4、buffer()操作符

  • 定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送。
        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                /*参数解释:
                  count:缓存区大小 = 每次从被观察者中获取的事件数量
                  skip:步长 = 每次获取新事件时跳过事件的数量*/
                .buffer(3, 4) //在事件足够的时候每次取count个值,每次跳过skip个事件
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer value : ");
                        for (Integer i : integers) {
                            Log.e(TAG, i + "");
                        }
                        Log.e(TAG, "\n");
                    }
                });
  • 看了下面日志结果,相信聪明的你肯定也知道buffer操作符的用法了。count参数还是比较好理解的,skip可能有些人会一时反应不过来,这里简单说下,skip其实就是每组之间发射的间隔,这里再结合上面实例说明下就很清晰了。就拿发射第二次事件的第一个value值怎么得来来说好了,这个值就是第一次事件的第一个value1+步长4得来的,1+4=5就是这么简单。
buffer size : 3
buffer value : 
1
2
3
buffer size : 3
buffer value : 
5
6
7
buffer size : 2
buffer value : 
9
10

组合/合并操作符

  • 组合多个被观察者(Observable) & 合并需要发送的事件

组合被观察者数量≤4个,如果要组合大于4个的观察者用concatArray()操作符,用法是一样的这里对concatArray操作符就略过了

1、concat()操作符

  • 把两个发射器连接成一个发射器,发射器合并后按发送顺序串行执行。


    concat操作符.png
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });
concat : 1
concat : 2
concat : 3
concat : 4
concat : 5
concat : 6
  • 从日志可以看出,发射器 B 把自己的三个孩子送给了发射器 A,让他们组合成了一个新的发射器。

2、merge()操作符

  • 组合多个被观察者一起发送数据,合并后按时间线并行执行。
  • 区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行,而merge操作符是按时间线并行执行的。


    merge.png
        // merge():组合多个被观察者(<4个)一起发送数据
        // 注:合并后按照时间线并行执行
        Observable.merge(
                /*intervalRange操作符参数:
                 * long start:发射事件起始值
                 * long count:发射事件数量
                 * long initialDelay:初始发射延时
                 * long period:反射周期
                 * */
                // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

日志输出结果:两个被观察者发送事件并行执行,输出结果 = 0,2 -> 1,3 -> 2,4


merge操作符.gif

3、zip()操作符

  • 合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。
  • 被观察者之间两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。
        //需要被合并的第一个被观察者:发射字符串类型事件
        Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });

        //需要被合并的第二个被观察者:发射整型事件
        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
        
        Observable.zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });
String emit : A 
String emit : B 
String emit : C 
zip : accept : A1
Integer emit : 1 
zip : accept : B2
Integer emit : 2 
zip : accept : C3
Integer emit : 3 
Integer emit : 4 
Integer emit : 5 
  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。
  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,上面就是stringObservable发射器。

4、reduce()操作符

  • 把被观察者需要发送的事件聚合成1个事件 & 发送
  • 聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推


    reduce.png
Observable.just(1, 2, 3, 4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次计算的数据是:" + s1 + " 乘 " + s2);
                        return s1 * s2;
                        // 本次聚合的逻辑是:全部数据相乘起来
                        // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据 * 原始下1个数据每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: " + s);
            }
        });
本次计算的数据是:1 乘 2
本次计算的数据是:2 乘 3
本次计算的数据是:6 乘 4
最终计算的结果是: 24

功能操作符

  • 辅助被观察者(Observable)在发送事件时实现一些功能性需求如错误处理、线程调度等等。

1、subscribe()操作符

  • 订阅,即连接观察者 & 被观察者。Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时。subscribe操作符上面已经用过很多次了,相信大家都已经知道它的用法和作用了,这里就不再啰嗦了。

2、subscribeOn()操作符

  • 用于指定被观察者的工作线程类型,若Observable.subscribeOn()多次指定被观察者生产事件的线程,则只有第一次指定有效,其余的指定线程无效。
  • RxJava中,内置的线程类型:
类型 含义 应用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 耗时等操作
Schedulers.io() io操作线程 网络请求、读写文件等io密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

3、observeOn()操作符

  • 用于指定观察者的工作线程类型,若Observable.observeOn()多次指定观察者接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换,下游的线程就会切换到指定线程。
Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()) //指定被观察者工作线程为子线程
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception { //这里没指定线程会延续被观察者的子线程

                    }
                })
                .observeOn(Schedulers.io()) //指定观察者工作线程为io线程
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception { //io线程
                        return null;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) //切换到UI线程
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception { //UI线程

                    }
                });

4、delay()操作符

  • 使得被观察者延迟一段时间再发送事件
  • delay()具备多个重载方法,具体如下:
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)

// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延迟时间  & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
  • 具体使用
Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS) // 延迟3s再发送(onNext(1)、onNext(2)、onNext(3)),其它重载方法由于使用类似,所以此处不作全部展示
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });

5、retry()操作符

  • 重试,即当出现错误时,让被观察者(Observable)重新发射数据
  1. 接收到 onError()时,重新订阅 & 发送事件
  2. Throwable 和 Exception都可拦截
  • 共有5种重载方法
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
 
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
  • 具体使用,重载方法较多,这里就举个最简单的例子,其它的就自行扩展了
//retry()
        // 作用:出现错误时,让被观察者重新发送数据
        // 注:若一直错误,则一直重新发送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                .retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
  • 运行日志输出如下,会一直不停的循环打印
接收到了事件1
接收到了事件2
接收到了事件1
接收到了事件2
...

过滤操作符

1、take()操作符

  • 指定观察者最多能接收到的事件数量
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 1. 发送5个事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }

            // 采用take()变换操作符
            // 指定了观察者只能接收2个事件
        }).take(2)
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "过滤后得到的事件是:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "对Complete事件作出响应");
            }
        });

// 实际上,可理解为:被观察者还是发送了5个事件,只是因为操作符的存在拦截了3个事件,最终观察者接收到的是2个事件
开始采用subscribe连接
过滤后得到的事件是:1
过滤后得到的事件是:2
对Complete事件作出响应

条件/布尔操作符

背压

  • 背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
  • Observeable是不支持背压的的被观察者,Flowable是支持背压的被观察者,还有相应与之对应的观察者如下图:


    image.png

结语

最近的博客可能都会有些显得匆忙,里面有些部分甚至都没有写完(之后有空会来完善)。这当中主要原因是自己为了达成今年共50篇博客的目标,加之有些博客的知识点确实太多了,要写全的话时间就来不及了,所以就先大体写下,缺的那点其实是无伤大雅的,你要认真看下来收获还是挺大的。

感谢

Carson_Ho的一系列关于RxJava的文章
这可能是最好的 RxJava 2.x 入门教程(完结版)

相关文章

网友评论

      本文标题:RxJava 2.x下篇(操作符)

      本文链接:https://www.haomeiwen.com/subject/uhtaxqtx.html