美文网首页
RxJava 使用总结

RxJava 使用总结

作者: Dapengyou | 来源:发表于2023-01-20 10:33 被阅读0次

Rxjava

角色 作用 类比
Observable(被观察者) 事件的生产者 顾客
Observer(观察者) 事件消费者,接收事件后作出响应 厨师
Subscribe(订阅) 将Observable和Observer连接在一起 服务员
Event(事件) Observable通知Observer的载体 菜品

RxJava 3 主要特点

  • 单一依赖:Reactive-Streams
  • 继续支持Java 6+和Android 2.3+
  • 修复了API错误和RxJava 2的许多限制
  • 旨在替代RxJava 2,具有相对较少的二进制不兼容更改
  • 提供Java 8 lambda友好的API
  • 关于并发源的不同意见
  • 异步或同步执行
  • 参数化并发的虚拟时间和调度程序
  • 为测试schedulers,consumers和plugin hooks提供测试和诊断支持

RxJava 3 与RxJava 2的主要区别是:

  • 将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引用
  • 新增 X.fromSupplier()
  • 使用 Scheduler 添加 concatMap,保证 mapper 函数的运行位置
  • 新增 startWithItem 和 startWithIterable
  • ConnectableFlowable/ConnetableFlowable 重新设计
  • 将 as() 并入 to()
  • 更改 Maybe.defaultIfEmpty() 以返回 Single
  • 用 Supplier 代替 Callable
  • 将一些实验操作符推广到标准
  • 从某些主题/处理器中删除 getValues()
  • 删除 replay(Scheduler) 及其重载
  • 删除 dematerialize()
  • 删除 startWith(T|Iterable)
  • 删除 as()
  • 删除 Maybe.toSingle(T)
  • 删除 Flowable.subscribe(4 args)
  • 删除 Observable.subscribe(4 args)
  • 删除 Single.toCompletable()
  • 删除 Completable.blockingGet()

下面是具体使用

依赖引用

implementation "io.reactivex.rxjava3:rxjava:3.1.5"

Observable(被观察者)的创建

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
});
  • Observable.create(): 创建一个Observable的最基本方法,也可以通过just()、from()等方法来简化操作。
    • just() 它只能发射整个列表,不会迭代发射整个列表的每个值
    • from() 操作符可以转换Future、Iterable和数组。将 Iterable 和数组转化为单个数据
  • ObservableOnSubscribe<>(): 一个接口,在复写的subscribe()里定义需要发送的事件。
  • ObservableEmitter: 这是RxJava2中新推出的类,可以理解为发射器,用于发射数据onNext()和通知onComplete()/onError()。

Observer(观察者) 的创建:

Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
};
  • onNext():普通事件,通过重写进行响应即可。
  • onError():错误事件,当队列中事件处理出现异常时,就会调用该方法,此后不再有事件发出。
  • onComplete():完成事件,当队列中的事件全部处理完成后触发。
    在一个正常的序列中,onError()和onComplete()有且仅有处于事件队列的末尾,并且为互斥关系,即调用了一个就不应该再调用另一个。

形成订阅

通过 subscribe 形成订阅

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
});

更简洁的代码

如果我们不关心onCompleted()或者onError()的话,那么可以使用一个更简单的类来定义onNext()期间要完成什么功能

Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

当我们不关心onCompleted()或者onError()时,为了更简化代码,可以使用以下代码:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

但是这种简便的写法不利于处理异常,所以可能会报 crash,这时再添加一个 new Action1<Throwable>() 能够很好的 catch 住 crash,代码如下:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        });

如果还想实现 onCompleted() 还可以再加一个 new Action0() 用来完成 onCompleted(), 代码如下:

 Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("Sequence complete");
            }
        });

更简便点还可以使用 Lambda:

 Observable.unsafeCreate((Observable.OnSubscribe<String>) subscriber -> {
        System.out.println(subscriber);
        }).subscribe(s -> System.out.println(s), 
                throwable -> System.out.println("Error encountered: " + throwable.getMessage()),
                () -> System.out.println("Sequence complete"));

分配线程

subscribeOn

为上面的内容 分配线程 Schedulers.io()

observeOn

为下面分配线程,一般是主线程 AndroidSchedulers.mainThread()

开发中常用到的操作符

  • Create — 通过调用观察者的方法从头创建一个Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

例子:使用 map 将 Long 型转化为 String,再使用 FlatMap 将 String 转化为 Long

 Observable.interval(5, TimeUnit.SECONDS)
                .map(new Func1<Long, String>() {
                    @Override
                    public String call(Long aLong) {
                        return String.valueOf(aLong);
                    }
                })
                .flatMap(new Func1<String, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(String aLong) {
                        Observable<Long> observable = Observable.just(Long.parseLong(aLong));
                        return observable;
                    }
                }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long string) {
                            Log.d("test", String.valueOf(string));
                    }
                });

Func vs Action:Func 提供数据,而 Action 消耗数据,Func0 代表没有参数,Func1 有一个参数,以此类推,Action 同理,可以有 N 个参数

操作符列表

下面是常用的操作符列表:

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 Catch和Retry
  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操作 To
  10. 连接操作 Connect, Publish, RefCount, Replay
  11. 反压操作,用于增加特殊的流程控制策略的操作符

创建操作

用于创建Observable的操作符

  • Create — 通过调用观察者的方法从头创建一个Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Timer — 创建在一个指定的延迟之后发射单个数据的Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Range — 创建发射指定范围的整数序列的Observable
  • Repeat — 创建重复发射特定的数据或数据序列的Observable
  • Start — 创建发射一个函数的返回值的Observable
  • Empty/Never/Throw — 创建行为受限的特殊Observable

变换操作

这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档

  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
  • Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
  • Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

过滤操作

这些操作符用于从Observable发射的数据中进行选择

  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
  • Distinct — 去重,过滤掉重复数据项
  • ElementAt — 取值,取特定位置的数据项
  • Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
  • First — 首项,只发射满足条件的第一条数据
  • IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)
  • Last — 末项,只发射最后一条数据
  • Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
  • Skip — 跳过前面的若干项数据
  • SkipLast — 跳过后面的若干项数据
  • Take — 只保留前面的若干项数据
  • TakeLast — 只保留后面的若干项数据

组合操作

组合操作符用于将多个Observable组合成一个单一的Observable

  • And/Then/When — 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
  • CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
  • Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
  • Merge — 将两个Observable发射的数据组合并成一个
  • StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
  • Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
  • Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

错误处理

这些操作符用于从错误通知中恢复

  • Catch — 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
  • Retry — 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

辅助操作

一组用于处理Observable的操作符

  • Delay — 延迟一段时间发射结果数据
  • Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
  • Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来
  • ObserveOn — 指定观察者观察Observable的调度程序(工作线程)
  • Serialize — 强制Observable按次序发射数据并且功能是有效的
  • Subscribe — 收到Observable发射的数据和通知后执行的操作
  • SubscribeOn — 指定Observable应该在哪个调度程序上执行
  • TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
  • Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
  • Timestamp — 给Observable发射的每个数据项添加一个时间戳
  • Using — 创建一个只在Observable的生命周期内存在的一次性资源

条件和布尔操作

这些操作符可用于单个或多个数据项,也可用于Observable

  • All — 判断Observable发射的所有的数据项是否都满足某个条件
  • Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
  • Contains — 判断Observable是否会发射一个指定的数据项
  • DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
  • SequenceEqual — 判断两个Observable是否按相同的数据序列
  • SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  • SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
  • TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

算术和聚合操作

这些操作符可用于整个数据序列

  • Average — 计算Observable发射的数据序列的平均值,然后发射这个结果
  • Concat — 不交错的连接多个Observable的数据
  • Count — 计算Observable发射的数据个数,然后发射这个结果
  • Max — 计算并发射数据序列的最大值
  • Min — 计算并发射数据序列的最小值
  • Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值
  • Sum — 计算并发射数据序列的和

连接操作

一些有精确可控的订阅行为的特殊Observable

  • Connect — 指示一个可连接的Observable开始发射数据给订阅者
  • Publish — 将一个普通的Observable转换为可连接的
  • RefCount — 使一个可连接的Observable表现得像一个普通的Observable
  • Replay — 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅

转换操作

  • To — 将Observable转换为其它的对象或数据结构
  • Blocking 阻塞Observable的操作符

相关文章

  • RxJava使用总结

    原博客地址 前言 RxJava出来已经很久了, 但我至今对它仍然不是很熟悉, 今天就系统的总结一下RxJava的常...

  • RxJava2 源码分析一

    文章目录 前言 RxJava2 介绍 RxJava2 使用 带问题看源码 总结 前言 在OkHttp3+Retro...

  • Android框架——RxJava(一)概述与基本使用

    RxJava(一)概述与基本使用 RxJava学习系列: RxJava(一)概述与基本使用 [RxJava(二)创...

  • RxJava的使用总结

    RxJava简介 RxJava的github地址https://github.com/ReactiveX/RxJa...

  • RxJava2 + Retrofit2的简单使用

    RxJava定义:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序库总结:RxJava 是一...

  • RxJava2 使用解析——常见的使用场景

    RxJava——目前最热门的响应式函数编程框架。本文主要总结了笔者在项目中使用到的RxJava的场景,部分例子参考...

  • RxJava

    其它文章 RxJava操作符大全 1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用...

  • RxJava初探

    我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处 RxJava特性: 轻...

  • RxJava + Retrofit 简单使用

    RxJava接入 RxJava 简单用法 Retrofit 简单使用 RxJava + Retrofit RxJa...

  • RxJava2.0使用总结

    这里是我对Rxjava2.0的一些学习总结,在此记录一下,以后用的时候也方便查找。 如何使用RxJava2 在An...

网友评论

      本文标题:RxJava 使用总结

      本文链接:https://www.haomeiwen.com/subject/wchchdtx.html