美文网首页
RxJava常用操作符

RxJava常用操作符

作者: 竖起大拇指 | 来源:发表于2021-03-30 16:34 被阅读0次

1.Map操作符

  Observable.create(new ObservableOnSubscribe<Integer>() {
                      @Override
                      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                          emitter.onNext(1);
                          emitter.onNext(2);
                          emitter.onNext(3);
                      }
                  }).map(new Function<Integer, String>() {//Integer转换为String
                      @Override
                      public String apply(Integer integer) throws Exception {
                          return "This is result " + integer;
                      }
                  }).subscribe(new Consumer<String>() {
                      @Override
                      public void accept(String s) throws Exception {
                          Log.d(TAG, s);
                      }
                  });

上游我们发送的是数字类型,下游接受的是String类型,中间起转换作用的就是map操作符(上游发来的事件转换为任意类型)。
运行结果:

D/TAG: This is result 1
D/TAG: This is result 2
D/TAG: This is result 3

2.FlatMap

FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里,注:concatMapFlatMap一样的作用,只是contactMap是有序的,FlatMap是无序的

  Observable.create(new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                  emitter.onNext(1);
                  emitter.onNext(2);
                  emitter.onNext(3);
              }
          }).flatMap(new Function<Integer, ObservableSource<String>>() {
              @Override
              public ObservableSource<String> apply(Integer integer) throws Exception {
                  final List<String> list = new ArrayList<>();
                  for (int i = 0; i < 3; i++) {
                      list.add("I am value " + integer);
                  }
                  return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
              }
          }).subscribe(new Consumer<String>() {
              @Override
              public void accept(String s) throws Exception {
                  Log.d(TAG, s);
              }
          });  

打印如下:

    D/TAG: I am value 1  
    D/TAG: I am value 1  
    D/TAG: I am value 1  
    D/TAG: I am value 3  
    D/TAG: I am value 3 
    D/TAG: I am value 3  
    D/TAG: I am value 2  
    D/TAG: I am value 2  
    D/TAG: I am value 2  

3.concatWith 链接操作符

Flowable.just(3, 2, 4).concatWith(Flowable.just(100, 200))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("concatWith, value:" + integer);
                    }
                });

打印如下:

concatWith, value:3
concatWith, value:2
concatWith, value:4
concatWith, value:100
concatWith, value:200
 

4.filter 过滤操作符

image.png

会过滤掉值小于10的项

 Flowable.just("item2", "item1", "item7", "item8", "item9").filter(new Predicate<String>() {
            @Override
            public boolean test(String s) throws Exception {
                return s.equals("item1") || s.equals("item7");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("s = " + s);
            }
        });
}

打印如下:


s = item1
s = item7

5.distinct 去重操作

发射的数据与之前发射的数据相比,如果相同就丢掉。


image.png
 Flowable.just("item1", "item2", "item2", "item3").distinct().subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("s:" + s);
            }
});

输出结果:

s:item1
s:item2
s:item3

6.distinctUntilChanged

区分每一个被发出项目与前一个紧邻的项不同,如果相同,就不会发射,而distinct会丢弃所有相同的重复项,不管是不是紧邻着发射的。

 Flowable.just("item2", "item1", "item2", "item2", "item3").distinctUntilChanged().subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("s:" + s);
            }
        });

打印如下:

s:item2
s:item1
s:item2
s:item3

7.mergeWith合并操作符

将两个观察源合并成一个

 var flowable=  Observable.just(3.toLong()).delay(2, TimeUnit.SECONDS)

        Observable.intervalRange(4, 6, 1, 1, TimeUnit.SECONDS).mergeWith(flowable).subscribe{
            Log.e(TAG, "merge:"+it.toString())
        }

打印如下:

merge:4
merge:5
merge:3
merge:6
merge:7
merge:8
merge:9

注:concat与merge区别:
1.merge 将全部订阅 Observable,但是谁先完成谁先通知,如果大家完成时间一样,按顺序调用
2.concat会在第一个Observable调用onComplete后才能订阅下一个 Observable,(顺序调用)

  1. 对于contact,有一点务必要注意,那就是上一个数据源不可用,请务必subscriber.onCompleted(); 这样才能继续走下一个数据源。

8. debounce 去抖操作符

当调用函数N秒后,才会执行函数中动作,若在这N秒内又重复调用该函数则将取消前一次调用,并重新计算执行时间。

这个debounce在js常被使用,比如界面根据用户输入做ajax请求局部刷新页面,那么势必会重复请求接口,而实际可能在n秒内用户并没有完成输入,那么频繁调函数肯定是不准确的,那么可以设定一个debounce time,在n秒内的调用都是无效的。

 Observable.just(1,2,3,4).debounce(1,TimeUnit.MILLISECONDS).
                subscribe{
                    Log.e(TAG,it.toString())
                }

打印如下:

4

使用场景

现在几乎所有的 App 都有搜索功能 , 一般情况我们监听 EditText 控件,当值发生改变去请求搜索接口. 如:

etKey.addTextChangedListener(new TextWatcher() {
    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) {
    }

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) {
    }

    @Override
    public void afterTextChanged(Editable s) {
        String key = etKey.getText().toString().trim();
        if (key.length() > 0){
            search(key); // 请求搜索接口,成功后把结果显示到界面上.
        }
    }
});

这样做有两个问题:

可能导致很多没有意义的请求,耗费用户流量(因为控件的值每更改一次立即就会去请求网络,而且只是最后输入的关键字是有用的)
可能导致最终搜索的结果不是用户想要的. 例如,用户一开始输入关键字 AB 这个时候出现两个请求, 一个请求是 A 关键字, 一个请求是 AB 关键字. 表面上是 A 请求先发出去, AB 请求后发出去. 如果后发出去的 AB 请求先返回, A 请求后返回,那么 A 请求后的结果将会覆盖 AB 请求的结果. 从而导致搜索结果不正确.

如何解决问题?

subscription = RxTextView.textChanges(etKey)
                .debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
                //对用户输入的关键字进行过滤
               .filter(new Func1<CharSequence, Boolean>() {
                    @Override
                    public Boolean call(CharSequence charSequence) {
                        Log.d("RxJava", "filter is main thread : " + (Looper.getMainLooper() == Looper.myLooper()));
                        return charSequence.toString().trim().length() > 0;
                    }
                })
                .flatMap(new Func1<CharSequence, Observable<List<String>>>() {
                    @Override
                    public Observable<List<String>> call(CharSequence charSequence) {
                        Log.d("RxJava", getMainText("flatMap"));
                        return searchApi.search(charSequence.toString());
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<List<String>>() {
                    @Override
                    public void call(List<String> strings) {
                        tvContent.setText("search result:\n\n");
                        tvContent.append(strings.toString());
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                        tvContent.append("Error:" + throwable.getMessage());
                    }
                });

上面代码的主要逻辑:

使用 debounce 操作符设置: 只有当用户输入关键字后 400 毫秒才发射数据(说的直白点就是 400 毫秒后才会走后面的逻辑)
使用 filter 操作符 对用户输入的关键字进行过滤:只有输入的关键字不为空,才会走后面的逻辑;
使用 flatMap 操作符:使用最终的关键字去请求搜索接口
至此,避免 EditText 每改变一次就请求一次的情况。
但是,还有一个问题,上面说的导致搜索结果的错乱,上面的代码还是没有解决,比如停止输入 400 毫秒后, 那么肯定会开始请求 Search 接口, 但是用户又会输入新的关键字,这个时候上个请求还没有返回, 新的请求又去请求 Search 接口.这个时候有可能最后的一个请求返回, 第一个请求最后返回,导致最终显示的结果是第一次搜索的结果.

怎么去解决这个问题:可以使用 switchMap 操作符解决。

switchMap 操作符和 flatMap 操作符差不多,区别是 switchMap 操作符只会发射(emit)最近的 Observables。

也就是说,当 400 毫秒后,发出第一个搜索请求,当这个请求的过程中,用户又去搜索了,发出第二个请求,不管怎样,switchMap 操作符只会发射第二次请求的 Observable。所以,在上面的代码基础上把 flatMap 改成 switchMap 就可以了。

9.take()

控制观察者接收的事件的数量


Observable.interval(0,1,TimeUnit.SECONDS)
                .take(60)
                .subscribe(object : Observer<Long> {
                    override fun onNext(integer: Long) {
                        
                        Log.e(TAG, "==================onNext $integer")
                    }


                    override fun onComplete() {
                        Log.e(TAG, "==================onComplete ")
                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.e(TAG, "==================onSubscribe ")

                    }

                    override fun onError(e: Throwable) {
                        Log.e(TAG, "==================onSubscribe ")

                    }
                })

如上代码可以实现一个短信验证码倒计时的功能。

10.takeUntil

可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。

    Observable.just(1,2,3,4,5,6).takeUntil {
            it<3
        }.subscribe { Log.e(TAG,"takeUntil:${it}") }

打印结果:

takeUntil:1

11.concat 组合操作符

讲多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat()最多只可以发送4个事件。

Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印如下:

================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8

12.retry

如果出现错误事件,则会重新发送所有事件序列。times是代表重新发的次数。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retry(2)
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

打印如下:

 ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError 

13.retryWhen()

当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送Error事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。

Observable.create(new ObservableOnSubscribe < String > () {
    @Override
    public void subscribe(ObservableEmitter < String > e) throws Exception {
        e.onNext("chan");
        e.onNext("ze");
        e.onNext("de");
        e.onError(new Exception("404"));
        e.onNext("haha");
    }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                    return Observable.just("可以忽略的异常");
                } else {
                    return Observable.error(new Throwable("终止啦"));
                }
            }
        });
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

打印如下:

 ==================onSubscribe 
 ==================onNext chan
==================onNext ze
==================onNext de
==================onError java.lang.Throwable: 终止啦

将onError(new Exception("404")) 改为onError(new Exception("303")) 看看打印结果:

==================onNext chan
 ==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......

14.concatDelayError()

描述:在使用concat()时,如果其中一个观察者发出了error事件,则其他所有的观察者事件都会停止。而concatDelayError()就是为了解决这一问题,使用此操作符,即使其中一个观察者发出了error事件,其他事件会继续发送,直到所有事件发送完毕,才出发onError。

Observable.concatArrayDelayError(
                //从1开始,共发送3个,延时1s发送第一个,之后间隔1s发送余下数字
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onError(new NullPointerException());
                        emitter.onComplete();
                    }
                }),
                //从11开始,共发送3个,延时1s发送第一个,之后间隔1s发送余下数字
                Observable.just(3, 4, 5))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.i(TAG,"onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i(TAG,"onNext:"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG,"onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.i(TAG,"onComplete");
                    }
                });

15.zip()

会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。

Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
    .map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            String s1 = "A" + aLong;
            Log.d(TAG, "===================A 发送的事件 " + s1);
            return s1;
        }}),
        Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
            .map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s2 = "B" + aLong;
                Log.d(TAG, "===================B 发送的事件 " + s2);
                return s2;
            }
        }),
        new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                String res = s + s2;
                return res;
            }
        })
.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});

上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:

05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
===================onNext A1B1
05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
===================onNext A2B2
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
===================onComplete 

可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。

相关文章

网友评论

      本文标题:RxJava常用操作符

      本文链接:https://www.haomeiwen.com/subject/xycghltx.html