美文网首页
RxJava3.x学习记录

RxJava3.x学习记录

作者: 打工崽 | 来源:发表于2021-05-10 15:03 被阅读0次

    RxJava3.x基本用法

    1. 概述

    1.1 ReactiveX与RxJava

    RxJava是ReactiveX的一种Java实现,Rx是一个函数库,开发者可利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。开发者可利用Observables表示异步数据流,用LINQ查询异步数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers

    1.2 为何用RxJava

    异步操作可以使用AsyncTask和Handler,但随着请求越来越多逻辑会非常复杂,但RxJava仍旧可以保持清晰的逻辑,RxJava的原理就是创建一个Observable对象来处理逻辑,然后使用各种操作符建立起来的链式操作,如同流水线一样处理数据一步步加工后发射给Subscriber处理

    1.3 背压

    背压是指一部场景中,被观察者发送事件的速度远快于观察者处理事件速度的情况下,一种告知上游的被观察者降低发送速度的策略

    1.4 观察者和被观察者

    RxJava的异步操作是通过扩展的观察者模式来实现的,RxJava中Observable代表被观察者,Observer代表观察者,RxJava3.x中有以下几个被观察者
    Observable:发送0个或N个数据,不支持背压。原本是支持的,RxJava2.x后由Flowable支持,因此改成不支持背压

    Flowable:发送0个或N个数据,支持背压。是RxJava2.x后的新类型

    Single:只处理onSuccess和onError事件,只能发送单个数据或者发送一个错误

    Completable:创建后不会发射任何数据,只处理onComplete和onError事件

    Maybe:能够发射0个或1个数据,是RxJava2.x后的新类型


    2. 基本实现

    使用前配置gradle

    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
    

    其中RxAndroid是RxJava在Android平台的扩展,包含了一些简化Android开发的工具,比如特殊的调度器等

    RxJava的基本用法分为三个步骤,如下

    2.1 创建Observer(观察者)

    它决定事件触发的时候将有怎样的行为,代码如下

    Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d("MainActivity","onSubscribe");
            }
    
            @Override
            public void onNext(@NonNull String s) {
                Log.d("MainActivity","onNext" + s);
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("MainActivity","onError");
            }
    
            @Override
            public void onComplete() {
                Log.d("MainActivity","onComplete");
            }
        };
    

    各方法含义如下
    onComplete:事件队列完结,RxJava不仅把每个事件单独处理,而且还会把他们看作一个队列,当不会再有新的onNext()发出时,需要触发onComplete()方法作为完成标志

    onError:事件队列异常,在事件处理过程中出现异常时,onError()方法会被触发,同时队列自动终止,不允许再有事件发出

    onNext:普通的事件,将要处理的事件添加到事件队列中

    onSubscribe:当订阅时会被调用

    2. 创建Observer(被观察者)

    决定什么时候触发事件以及触发怎样的事件,RxJava使用create()方法来创建一个Observable,并为它定义事件触发规则

    Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("我是谁");
                emitter.onNext("我在哪");
                emitter.onComplete();
            }
        });
    

    通过调用subscribe()方法不断将事件添加到任务队列中,也可以用just方法实现

    Observable observable = Observable.just("我是谁","我在哪");
    

    3. Subscribe订阅

    订阅只需要一行代码即可

    observable.subscribe(observer);
    

    运行代码查看log


    image.png

    先调用onSubscribe(),再调用两个onNext(),最后完成了调用onComplete()方法


    RxJava3.x的Subject和Processor

    这两个有些相似,一起说,先介绍较为复杂的Subject

    1. Subject的分类

    Subject既可以是一个Observer,也可以是一个Observerable,它是链接Observer和Observerable的桥梁,因此Subject可以被理解为Subject = Observer + Observable。RxJava提供了8种Subject,如下

    1.1 PublishSubject

    PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里可能会有一个风险:在Subject被创建后,到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失,如果要确保来自原始Observable的所有数据都被分发,则可以当所有观察者都已经订阅时才开始发射数据,或者改用ReplaySubject

    1.2 BehaviorSubject

    当Observer订阅BehaviorSubject时,BehaviorSubject开始发射原始Observer最近发射的数据,如果Observer此时还没有收到任何数据,则BehaviorSubject会发射一个默认值,然后继续发射其他任何来自原始Observable的数据,如果原始的Observable因为发生错误而终止,则BehaviorSubject不会发射任何数据,但是会向Observer传递一个异常数据

    1.3 ReplaySubject

    不管Observer何时订阅ReplaySubject,ReplaySubject都会向Observer发射所有来自原始Observable的数据,有不同类型的ReplaySubject,它们用来限定Replay的范围,比如设定Buffer的具体大小,或者设定具体的时间范围。如果用ReplaySubject作为Observer,则注意不要在多个线程中调用onNext()、onComplete()、onError(),这可能会导致数据发送错乱,并且违反Observer规则

    1.4 AsyncSubject

    当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据,如果原始的Observable因为发生了错误而终止,则AsyncSubject将不会发射任何数据,但是会向Observer传递一个异常通知

    1.5 UnicastSubject

    只允许一个Observer进行监听,在该Obsesrver注册之前会将发射的所有事件放进一个队列中,并在Observer注册的时候将所有事件一起通知给它,如果有多个观察者订阅,那么程序会报错

    1.6 CompletableSubject

    只发送Observer发射完毕的数据,也就是只发送onComplete()

    1.7 MaybeSubject

    主要用于发送一个结果数据,一般用于验证某个结果

    1.8 SingleSubject

    SingleSubject和MaybeSubject的区别不大,只不过SingleSubject没有onComplete()方法和onErrorComplete()方法

    2. Processor

    说完Subject,来说Processor,这是RxJava2的新增功能,是一个接口,继承自Subscriber和Publisher。与Subject作用类似,只不过Processor支持背压

    主要种类如下:AsnycProcessor,BehaviorProcessor,FlowableProcessor,MulticastProcessor,PublishProcessor,ReplayProcessor,UnicastProcessor,内容大部分与Subject相似,不多介绍


    RxJava3.x操作符入门

    RxJava操作符的类型分为创建操作符,变换操作符,过滤操作符,组合操作符,错误处理操作符,辅助操作符,条件和布尔操作符,算数和聚合操作符,而这些操作符类型下又有很多操作符,每个操作符可能还有很多变体。

    1. 创建操作符

    上文我们已经使用的创建操作符create()和just(),这里就不赘述了,除了它们,还有defer、range、interval、start、repeat、timer等创建操作符,下面介绍interval、range、repeat

    1.1 interval

    创建一个按固定时间间隔发射整数序列的Observable,相当于定时器,如下

    Observable.interval(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Throwable {
                    Log.d("MainActivity", "interval:" + aLong.intValue());
                }
            });
    

    于是每隔3s就会调用accept()方法打印log,这里只截取6次


    image.png

    1.2 range

    创建发射指定范围的整数序列的Observable,可以拿来替代for循环,发射一个范围内的有序整数序列,第一个参数是起始值,并且不小于0,第二个参数为终值,区间为左闭右开

    Observable.range(0,5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("MainActivity", "range:" + integer);
                }
            });
    

    打印log如下


    image.png

    1.3 repeat

    创建一个N次重复发射特定数据的Observable,如下

    Observable.range(0,3).repeat(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("MainActivity","repeat:" + integer);
                }
            });
    

    重复打印range()里的数字2次


    image.png

    2. 变换操作符

    变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。变换操作符有map、flatMap、concatMap、switchMap、flatMapIterable、buffer、groupBy、cast、window、scan等,这里介绍map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy

    2.1 map

    通过指定一个Function对象将源Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable对象并处理,假设我们要访问网络,Host地址时常是变化的,有时是测试服务器地址,有时是正式服务器地址,但是具体界面的URL地址是不变的,因此我们可以用map来进行变换字符操作,这里简单修改一下URL

    final String Host = "http://**********";
            Observable.just(".cn").map(new Function<String, String>() {
    
                @Override
                public String apply(String s) throws Throwable {
                    return Host + s;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Throwable {
                    Log.d("ChangeActivity", "map:" + s);
                }
            });
    

    打印log如下,加上了 “.cn”


    image.png

    2.2 flatMap、cast

    flatMap操作符将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化地放进一个单独的Observable中。cast操作符地作用是强制将Observable发射的所有数据转换为指定类型的数据。假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host,但是RxJava提供了一个更方便的操作,如下

    final String Host1 = "http://blog.****.net/";
            List<String> mlist = new ArrayList<>();
            mlist.add("itachi86");
            mlist.add("itachi87");
            mlist.add("itachi88");
            Observable.fromIterable(mlist).flatMap(new Function<String, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(String s) throws Throwable {
                    return Observable.just(Host + s);
                }
            }).cast(String.class).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Throwable {
                    Log.d("ChangeActivity", "flatMap:" + s);
                }
            });
    

    打印log如下所示


    image.png

    首先用ArrayList存储要访问的界面URL,然后通过flatMap转换成Observable。cast操作符将Observable中的数据转换为String类型

    注意,flatMap的合并是允许交叉的,也就是说采用flatMap操作符时可能会交错的发送事件,最终结果的顺序可能并不是原始的Observable发送时的顺序

    2.3 concatMap

    concatMap操作符的功能与flatMap操作符一致,不过它解决了flatMap的交叉问题,提供了一种能把发射的值连续在一起的函数而不是合并他们,代码如下,有兴趣可以自己打一下log

    final String Host2 = "http://blog.****.net/";
            Observable.fromIterable(mlist).concatMap(new Function<String, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(String s) throws Throwable {
                    return Observable.just(Host + s);
                }
                //转换为String
            }).cast(String.class).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Throwable {
                    Log.d("ChangeActivity", "concatMap:" + s);
                }
            });
    

    2.4 flatMapIterable

    这个操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了,如下

    Observable.just(1,2,3).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
                @Override
                public Iterable<Integer> apply(Integer integer) throws Throwable {
                    List<Integer> mlist = new ArrayList<>();
                    mlist.add(integer + 1);
                    return mlist;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("ChageActivity", "flatMapIterable:" + integer);
                }
            });
    

    将每个数都加1,打印如下


    image.png

    2.5 buffer

    buffer操作符将源Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个个发射数据。与buffer操作符类似的还有window操作符,只不过window操作符发射的是Observable而不是数据列表

    Observable.just(1,2,3,4,5,6).buffer(3).subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Throwable {
                    for(Integer i : integers){
                        Log.d("ChangeActivity", "buffer:" + i);
                    }
                    Log.d("MainAtivity", "----------------");
                }
            });
    

    buffer的意思是缓存容量为3,打印如下


    image.png

    2.6 groupBy

    groupBy用于分组元素,将源Observable转换成一个发射Observables的新的Observable(分组后的),每一个新的Observable都发射一组指定的数据,那我们先写一个Java Bean,这里为了方便不做封装

    public class People {
        String name;
        String id;
    
        public People(String name, String id) {
            this.name = name;
            this.id = id;
        }
    }
    

    再编写groupBy

    People p1 = new People("周杰伦","A");
            People p2 = new People("张震岳","SS");
            People p3 = new People("刘德华","S");
            People p4 = new People("胡一天","S");
            People p5 = new People("林志颖","A");
            People p6 = new People("鲁迅","SS");
            People p7 = new People("胡适","S");
            People p8 = new People("李大钊","A");
            Observable<GroupedObservable<String, People>> GroupedObservable =
                    Observable.just(p1,p2,p3,p4,p5,p6,p7,p8).groupBy(new Function<People, String>() {
                        @Override
                        public String apply(People people) throws Throwable {
                            return people.id;
                        }
                    });
    
            Observable.concat(GroupedObservable).subscribe(new Consumer<People>() {
                @Override
                public void accept(People people) throws Throwable {
    
                    Log.d("ChangeActivity", "groupBy:" + people.name + "----" + people.id);
                }
            });
    

    这里创建了8个人物,按实力进行划分,groupBy帮助我们对某一个key进行分组,相同的key的值数据放在一起,concat是组合操作符,之后会介绍,打印如下


    image.png

    3. 过滤操作符

    过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足条件的数据,过滤操作符有filter,elementAt,distinct,skip,take,skipLast,takeLast,ignoreElements,throttleFirst,sample,debounce和throttleWithTimeOut等,下面介绍filter,elementAt,distinct,skip,take,ignoreElements,throttleFirst,throttleWithTimeOut

    3.1 filter

    filter操作符会对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者,如下

    Observable.just(1,2,3,4).filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Throwable {
                    return integer > 2;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity","filter:" + integer);
                }
            });
    

    打印log如下,大于2的数才会被打印出来


    image.png

    3.2 elementAt

    elementAt操作符用来返回指定位置的数据,和它类似的还有elementAtOrDefault(int , T),elementAtOrDefault()可以允许默认值,如下

    Observable.just(1,2,3,4).elementAt(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "elementAt:" + integer);
                }
            });
    

    打印索引为2的数,如下


    image.png

    3.3 distinct

    distinct操作符可以用来去重,只允许还没有发射过的数据项通过,和它类似的还有distinctUntilChanged操作符,distinctUntilChanged用来去掉连续重复的数据

    Observable.just(1,2,2,3,4,1).distinct().subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "distinct:" + integer);
                }
            });
    

    打印如下


    image.png

    3.4 skip、take

    skip操作符将源Observable发射的数据过滤掉前n项,而take操作符只取前n项,另外,skiplast和takelast操作符则是从Observable发射的数据的后面进行过滤操作,首先来看skip操作符,如下

    Observable.just(1,2,3,4,5,6).skip(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "skip:" + integer);
                }
            });
    

    打印后几项


    image.png

    再来看take

    //take
            Observable.just(1,2,3,4,5,6).take(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "take:" + integer);
                }
            });
    

    取前几项,如下


    image.png

    3.5 ignoreElements

    ignoreElements操作符忽略所有源Observable产生的结果,把Observable的onComplete()和onError()事件通知订阅者,如下

    Observable.just(1,2,3,4).ignoreElements().subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.d("FilterActivity", "onComplete");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d("FilterActivity", "onError");
                }
            });
    

    输出如下


    image.png

    3.6 throttleFirst

    throttleFirst操作符会定期发射这个时间段里源Observable发射的第一个数据,throttleFirst操作符默认在computation调度器上执行,和throttleFirst操作符类似的还有sample操作符,sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉,throttleFirst操作符的使用示例如下所示

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
                    for(int i = 0; i < 10; i++){
                        emitter.onNext(i);
                        try{
                            Thread.sleep(100);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    emitter.onComplete();
                }
            }).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "throttleFirst:" + integer);
                }
            });
    

    每隔100ms发射一个数据,throttleFirst操作符设定的时间为200ms,因此他会发射200ms内的第一个数据,如下


    image.png

    3.7 throttleWithTimeOut

    通过时间来限流,源Observable每次发射出来一个数据后就会进行计时,如果在设定好的时间结束前源Observable有新的数据发射出来,这个数据就会被丢弃,同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极端:只会发射最后一个数据。throttleWithTimeOut默认在computation调度器上执行。和throttleWithTimeOut操作符类似的还有deounce操作符,它不仅可以使用时间来进行过滤,还可以根据一个函数来进行限流,throttleWithTimeOut操作符的使用实例如下所示

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable{
                    for(int i = 0; i <10; i++){
                        emitter.onNext(i);
                        int sleep = 100;
                        if(i % 3 == 0){
                            sleep = 300;
                        }
                        try{
                            Thread.sleep(sleep);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    emitter.onComplete();
                }
            }).throttleWithTimeout(200, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("FilterActivity", "throttleWithTimeOut:" + integer);
                }
            });
    

    每隔100ms发射一个数据,当发射的数据是3的倍数时,下一个数据就延迟到300ms再发射,这里设定的过滤时间是200ms,也就是说发射间隔小于200ms的数据会被过滤掉,打印结果如下


    image.png

    4. 组合操作符

    组合操作符可以同时处理多个Observable来创建我们所需要的Observable,组合操作符有merge、concat、zip、combineLatest、join和switch等,这里介绍merge、concat、zip和combineLatest

    4.1 merge

    merge操作符可以将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错

    Observable<Integer> obs1 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
            Observable<Integer> obs2 = Observable.just(4,5,6);
            Observable.merge(obs1,obs2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("CombineActivity", "merge:" + integer);
                }
            });
    

    输出结果如下


    image.png

    4.2 concat

    将多个Observable发射的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射完成是不会发射后一个Observable的数据的

    Observable<Integer> obs3 = Observable.just(1,2,3).subscribeOn(Schedulers.io());
            Observable<Integer> obs4 = Observable.just(4,5,6);
            Observable.concat(obs3,obs4).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("CombineActivity", "concat:" + integer);
                }
            });
    

    输出如下


    image.png

    4.3 zip

    zip操作符合并两个或多个Observable发射的数据项,根据指定的函数对Observable执行变换操作,并发射一个新值

    Observable<Integer> obs5 = Observable.just(1,2,3);
            Observable<String> obs6 = Observable.just("a", "b", "c");
            Observable.zip(obs5, obs6, new BiFunction<Integer, String, String>() {
                @Override
                public String apply(Integer integer, String s) throws Throwable{
                    return integer + s;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Throwable {
                    Log.d("CombineActivity", "zip:" + s);
                }
            });
    

    输出结果如下


    image.png

    4.4 combineLatest

    当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。combineLatest操作符和zip有点类似,只不过zip操作如用于最近未打包的两个Observable,只有当原始Observable中的每一个都发射了一条数据时zip才发射数据,而combineLatest操作符用于最近发射的数据项,在原始的Observable中的任意一个发射了数据时侯继续发射一条数据

    Observable<Integer> obs7 = Observable.just(1,2,3);
            Observable<String> obs8 = Observable.just("a", "b", "c");
            Observable.combineLatest(obs7, obs8, new BiFunction<Integer, String, String>() {
    
                @Override
                public String apply(Integer integer, String s) throws Throwable {
                    return integer + s;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Throwable {
                    Log.d("CombineActivity", "combineLatest:" + s);
                }
            });
    

    如果其中一个Observable还有数据没有发射,那么combineLatest操作符会将两个Observable最新发射的数据组合在一起,如上。第一个Observable最新的数据是3,然后第二个Observable的数据依次在变,之后把第一个和第二个Observable数据组合在一起,打印如下


    image.png

    5. 辅助操作符

    辅助操作符可以帮助我们更方便的处理Observable,辅助操作符包括delay、do、subscribeOn、observeOn、timeout、meterialize、dematerialize、timeInterval、timestamp和to等,下面介绍delay、do、subscribeOn、observeOn和timeout

    delay

    delay操作符让原始Observabl在发射每项数据之前都暂停一段指定的时间段

    Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable {
                    Long currentTime = System.currentTimeMillis() / 1000;
                    emitter.onNext(currentTime);
                }
            }).delay(2, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Throwable {
                    Log.d("HelpActivity", "delay:" + (System.currentTimeMillis() / 1000 - aLong));
                }
            });
    

    输出结果如下


    image.png

    5.1 do

    do系列操作符就是为原始的Observable的生命周期事件注册一个回调,当Observable的某事件发生时就会调用这些回调,RxJava中有很多do系列操作符,如下

    1. doOnEach:为Observable注册这样一个回调,Observable每发射一项数据就会调用一个回调函数,包括onNext、onError和onComplete
    2. doOnNext:只有执行onNext的时候会被调用
    3. doOnSubscribe:当观察者订阅Observable时就会被调用
    4. doOnError:当Observable异常终止调用onError时会被调用
    5. doOnTerminate:当Observable终止(无论正常终止或异常终止)之前会被调用
    6. finallyDo:当Observable终止(无论正常终止还是异常终止)之后会被调用

    这里拿doOnNext来举例,如下所示

    Observable.just(1,2).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("HelpActivity","call:" + integer);
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    Log.d("HelpActivity", "onNext:" + integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d("HelpActivity", "Error:" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d("HelpActivity","onComplete");
                }
            });
    

    输出如下


    image.png

    5.2 subscribeOn、observeOn

    subscribeOn操作符用于指定Observable自身在哪个线程上运行,如果Observable需要执行耗时操作,则一般可以让其在新开的一个子线程上运行。observeOn用来指定Observe所运行的线程,也就是发射出的数据在那个线程上使用。一般情况下会指定其在主线程中运行,这样就可以修改UI,具体如下

    Observable<Integer> obs = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    Log.d("HelpActivity", "Observable:" + Thread.currentThread().getName());
                    emitter.onNext(1);
                    emitter.onComplete();
                }
            });
    
    obs.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("HelpActivity", "Observer:" + Thread.currentThread().getName());
                }
            });
    

    subscribeOn(Schedulers.newThread())表示Observable运行在新开的线程,observeOn(AndroidSchedulers.mainThread())表示Observable运行在主线程,其中,AndroidSchedulers是RxAndroid库提供的Scheduler

    输出结果如下


    image.png

    5.3 timeout

    如果原始的Observable过了指定的一段时长还没有发射任何数据,则timeout操作符会以一个onError通知来终止这个Observable,或者继续执行一个备用的Observable。timeout有很多变体,这里介绍其中的一种:timeout(long,TimeUnit,Observable)。它在超时时会切换到使用一个你指定的备用Observable,而不是发送错误通知。它默认在computation调度器上执行,具体如下

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    for(int i = 0; i < 4; i++){
                        try{
                            Thread.sleep(i * 100);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }).timeout(200, TimeUnit.MILLISECONDS, Observable.just(10,11));
            observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("HelpActivity", "timeout:" + integer);
                }
            });
    
    image.png

    6. 错误处理操作符

    RxJava在错误出现时就会调用观察者的onError()方法将错误分发出去,由观察者自己处理错误,但是如果每个观察者都处理一遍错误的话,工作量就会很大。这可以使用错误处理操作符,错误处理操作符由catch和retry

    6.1 catch

    catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable能够正常终止或者根本不终止

    RxJava将catch实现为3个不同的操作符

    1. onErrorReturn:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发射一个特殊的项并调用观察者的onComplete()方法
    2. onErrorResumeNext:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError()调用,不会将错误传递给观察者,作为替代,它会发射备用的Observable
    3. onExceptionResumeNext:和onErrorResumeNext类似,onExceptionResumeNext()方法返回一个镜像原有Observable行为的新Observable,不同的是,如果onError()收到的Throwable不是一个Exception,就将错误传递给观察者的onError()方法,而不会使用备用的Observable

    下面举例onErrorReturn操作符

    Observable.create(new ObservableOnSubscribe<Integer>() {
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception{
                    for(int i = 0; i < 5; i++){
                        if(i == 2){
                            emitter.onError(new Throwable("Throwable"));
                        }
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }).onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    Log.e("MistakeActivity", "在onErrorReturn处理了: " + throwable.toString());
                    return 6;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    Log.i("MistakeActivity", "onNext:" + integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d("MistakeActivity", "onError:" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i("MistakeActivity", "onComplete");
                }
            });
    

    输出结果如下


    image.png

    6.2 retry

    retry操作符不会将原始的Observable的onError()通知传递给观察者,观察者会订阅这个Observable,再给这个Observable一次机会来无错误地完成它的数据序列。retry总是传递onNext通知给观察者,由于重新订阅,因此可能会造成数据项重复。RxJava中的实现为retry和retryWhen。这里拿retry(long)举例,retry(long)指定了最多重新订阅的次数。如果次数超了,它不会尝试再次订阅。retry(long)会把最新的一个onError通知传递给自己的观察者,如下

    Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    for(int i = 0; i < 5; i++){
                        if(i == 1){
                            emitter.onError(new Throwable("Throwable"));
                        }else{
                            emitter.onNext(i);
                        }
                    }
                    emitter.onComplete();
                }
            }).retry(3).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    Log.d("MistakeActivity","onNext:" + integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d("MistakeActivity", "onError:" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d("MistakeActivity", "onComplete");
                }
            });
    

    上面重新订阅次数为3,在i = 0时调用onNext()方法,此外重试的3次也会调用onNext()方法,这样一共调用4次onNext(),最后调用onError()方法,输出如下


    image.png

    7. 条件操作符和布尔操作符

    条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对他们做布尔运算。先来了解一下布尔操作符

    布尔操作符

    布尔操作符有all、contains、isEmpty、exists和sequenceEqual。下面介绍前3个操作符

    7.1 all

    all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断的结果。这个函数使用发射的数据作为参考,内部判断所有的数据是否满足我们定义好的判断条件。满足返回true,否则返回false

    Observable.just(1,2,3).all(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Throwable {
                    Log.d("BoolActivity", "call:" + integer);
                    return integer < 2;
                }
            }).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Throwable {
                    Log.d("BoolActivity", "accept--" + aBoolean);
                }
            });
    

    输出结果如下


    image.png

    7.2 contains

    contains操作符用来判断源Observable所发射的数据是否包含某一个数据。如果包含返回true。如果源Observable已经结束了却还没有发射这个数据,则返回false

    Observable.just(1,2,3).contains(1).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Throwable {
                    Log.d("BoolActivity", "contains:" + aBoolean);
                }
            });
    

    输出如下


    image.png

    7.3 isEmpty

    isEmpty操作符用来判断源Observable是否发射过数据,如果发射过数据就返回false,如果源Observable已经结束了却还没有发射这个数据,则返回true

    Observable.just(1,2,3).isEmpty().subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Throwable {
                    Log.d("BoolActivity", "isEmpty:" + aBoolean);
                }
            });
    

    输出如下


    image.png

    条件操作符

    条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil、takeWhile等。这里介绍前两个操作符

    7.4 amb

    amb操作符对于给定的两个或多个Observable,它只发射首先发射数据或通知的那个Observable的所有数据

    Observable.ambArray(Observable.just(1,2,3).delay(2, TimeUnit.SECONDS),Observable.just(4,5,6))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Throwable {
                            Log.d("ConditionActivity", "amb:"+ integer);
                        }
                    });
    

    第一个Observable延迟2s发射,所以很显然最终只会发射第二个Observable,输出结果如下


    image.png

    7.5 defaultIfEmpty

    发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据,如下

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    emitter.onComplete();
                }
            }).defaultIfEmpty(3).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    Log.d("ConditionActivity","defaultIfEmpty:" + integer);
                }
            });
    

    这里没有create任何数据,所以发射default的数据3


    image.png

    8. 转换操作符

    转换操作符用来将Observable转换为另一个对象或数据结构,转换操作符有toList、toSortedList、toMap、toMultiMap、getIterator和nest等,这里介绍前3种操作符

    8.1 toList

    将发射多项数据的Observable会为每一项数据调用onNext()方法,toList操作可将该Observable发射的多项数据组合成一个List

    Observable.just(1,2,3).toList().subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Throwable {
                    for(int i : integers){
                        Log.i("ConvertActivity", "toList:" + i);
                    }
                }
            });
    

    输出结果如下


    image.png

    8.2 toSortedList

    类似于toList操作符,不同的是,toSortedList操作符会将对产生的列表排序,默认自然升序。如果发射的数据项没有实现Comparable接口,则会抛出一个异常。当然,若发射的数据项没有实现Comparable接口,则可以使用toSortedList(Func2)的变体,其传递的函数参数可用于比较两个数据项

    Observable.just(3,1,2).toSortedList().subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Throwable {
                    for(int i : integers){
                        Log.i("ConvertActivity", "toSortedList:" + i);
                    }
                }
            });
    

    输出结果如下


    image.png

    8.3 toMap

    操作符将原始的Observable发射的所有数据项收集到一个Map(默认是HashMap)中,然后发射这个Map。你可以提供一个用于生成Map的key的函数,也可以将一个函数转换后的数据项作为Map存储的值(默认情况下数据项本身就是值)

    People p1 = new People("小A","A");
            People p2 = new People("小B","B");
            People p3 = new People("小C","C");
            Observable.just(p1,p2,p3).toMap(new Function<People, String>() {
    
                @Override
                public String apply(People people) throws Throwable {
                    return people.id;
                }
            }).subscribe(new Consumer<Map<String, People>>() {
                @Override
                public void accept(Map<String, People> stringPeopleMap) throws Throwable {
                    Log.i("ConvertActivity", "toMap:" + stringPeopleMap.get("B").name);
                }
            });
    

    打印key值为B的名字


    image.png

    本文摘抄自《进阶之光——刘望舒》,是个人学习路线上的总结,不以盈利为目的。


    欢迎指正。

    相关文章

      网友评论

          本文标题:RxJava3.x学习记录

          本文链接:https://www.haomeiwen.com/subject/micjrltx.html