Rxjava
角色 | 作用 | 类比 |
---|---|---|
Observable(被观察者) | 事件的生产者 | 顾客 |
Observer(观察者) | 事件消费者,接收事件后作出响应 | 厨师 |
Subscribe(订阅) | 将Observable和Observer连接在一起 | 服务员 |
Event(事件) | Observable通知Observer的载体 | 菜品 |
RxJava 3 主要特点
- 单一依赖:Reactive-Streams
- 继续支持Java 6+和Android 2.3+
- 修复了API错误和RxJava 2的许多限制
- 旨在替代RxJava 2,具有相对较少的二进制不兼容更改
- 提供Java 8 lambda友好的API
- 关于并发源的不同意见
- 异步或同步执行
- 参数化并发的虚拟时间和调度程序
- 为测试schedulers,consumers和plugin hooks提供测试和诊断支持
RxJava 3 与RxJava 2的主要区别是:
- 将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引用
- 新增 X.fromSupplier()
- 使用 Scheduler 添加 concatMap,保证 mapper 函数的运行位置
- 新增 startWithItem 和 startWithIterable
- ConnectableFlowable/ConnetableFlowable 重新设计
- 将 as() 并入 to()
- 更改 Maybe.defaultIfEmpty() 以返回 Single
- 用 Supplier 代替 Callable
- 将一些实验操作符推广到标准
- 从某些主题/处理器中删除 getValues()
- 删除 replay(Scheduler) 及其重载
- 删除 dematerialize()
- 删除 startWith(T|Iterable)
- 删除 as()
- 删除 Maybe.toSingle(T)
- 删除 Flowable.subscribe(4 args)
- 删除 Observable.subscribe(4 args)
- 删除 Single.toCompletable()
- 删除 Completable.blockingGet()
下面是具体使用
依赖引用
implementation "io.reactivex.rxjava3:rxjava:3.1.5"
Observable(被观察者)的创建
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onComplete();
}
});
- Observable.create(): 创建一个Observable的最基本方法,也可以通过just()、from()等方法来简化操作。
- just() 它只能发射整个列表,不会迭代发射整个列表的每个值
- from() 操作符可以转换Future、Iterable和数组。将 Iterable 和数组转化为单个数据
- ObservableOnSubscribe<>(): 一个接口,在复写的subscribe()里定义需要发送的事件。
- ObservableEmitter: 这是RxJava2中新推出的类,可以理解为发射器,用于发射数据onNext()和通知onComplete()/onError()。
Observer(观察者) 的创建:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 达成订阅");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: 响应了"+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: 执行错误");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 执行完成");
}
};
- onNext():普通事件,通过重写进行响应即可。
- onError():错误事件,当队列中事件处理出现异常时,就会调用该方法,此后不再有事件发出。
- onComplete():完成事件,当队列中的事件全部处理完成后触发。
在一个正常的序列中,onError()和onComplete()有且仅有处于事件队列的末尾,并且为互斥关系,即调用了一个就不应该再调用另一个。
形成订阅
通过 subscribe 形成订阅
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 达成订阅");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: 响应了"+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: 执行错误");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 执行完成");
}
});
更简洁的代码
如果我们不关心onCompleted()或者onError()的话,那么可以使用一个更简单的类来定义onNext()期间要完成什么功能
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
};
当我们不关心onCompleted()或者onError()时,为了更简化代码,可以使用以下代码:
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
但是这种简便的写法不利于处理异常,所以可能会报 crash,这时再添加一个 new Action1<Throwable>() 能够很好的 catch 住 crash,代码如下:
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error encountered: " + throwable.getMessage());
}
});
如果还想实现 onCompleted() 还可以再加一个 new Action0() 用来完成 onCompleted(), 代码如下:
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error encountered: " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
});
更简便点还可以使用 Lambda:
Observable.unsafeCreate((Observable.OnSubscribe<String>) subscriber -> {
System.out.println(subscriber);
}).subscribe(s -> System.out.println(s),
throwable -> System.out.println("Error encountered: " + throwable.getMessage()),
() -> System.out.println("Sequence complete"));
分配线程
subscribeOn
为上面的内容 分配线程 Schedulers.io()
observeOn
为下面分配线程,一般是主线程 AndroidSchedulers.mainThread()
开发中常用到的操作符
-
Create
— 通过调用观察者的方法从头创建一个Observable -
From
— 将其它的对象或数据结构转换为Observable -
Just
— 将对象或者对象集合转换为一个会发射这些对象的Observable -
Interval
— 创建一个定时发射整数序列的Observable -
Map
— 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项 -
FlatMap
— 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
例子:使用 map 将 Long 型转化为 String,再使用 FlatMap 将 String 转化为 Long
Observable.interval(5, TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return String.valueOf(aLong);
}
})
.flatMap(new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String aLong) {
Observable<Long> observable = Observable.just(Long.parseLong(aLong));
return observable;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Long>() {
@Override
public void call(Long string) {
Log.d("test", String.valueOf(string));
}
});
Func vs Action:Func 提供数据,而 Action 消耗数据,Func0 代表没有参数,Func1 有一个参数,以此类推,Action 同理,可以有 N 个参数
操作符列表
下面是常用的操作符列表:
- 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
- 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
- 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 错误处理 Catch和Retry
- 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
- 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
- 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
- 转换操作 To
- 连接操作 Connect, Publish, RefCount, Replay
- 反压操作,用于增加特殊的流程控制策略的操作符
创建操作
用于创建Observable的操作符
-
Create
— 通过调用观察者的方法从头创建一个Observable -
From
— 将其它的对象或数据结构转换为Observable -
Defer
— 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable -
Just
— 将对象或者对象集合转换为一个会发射这些对象的Observable -
Timer
— 创建在一个指定的延迟之后发射单个数据的Observable -
Interval
— 创建一个定时发射整数序列的Observable -
Range
— 创建发射指定范围的整数序列的Observable -
Repeat
— 创建重复发射特定的数据或数据序列的Observable -
Start
— 创建发射一个函数的返回值的Observable -
Empty/Never/Throw
— 创建行为受限的特殊Observable
变换操作
这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档
-
Map
— 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项 -
FlatMap
— 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。 -
Buffer
— 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个 -
GroupBy
— 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 -
Scan
— 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值 -
Window
— 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集
过滤操作
这些操作符用于从Observable发射的数据中进行选择
-
Debounce
— 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作 -
Distinct
— 去重,过滤掉重复数据项 -
ElementAt
— 取值,取特定位置的数据项 -
Filter
— 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的 -
First
— 首项,只发射满足条件的第一条数据 -
IgnoreElements
— 忽略所有的数据,只保留终止通知(onError或onCompleted) -
Last
— 末项,只发射最后一条数据 -
Sample
— 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst -
Skip
— 跳过前面的若干项数据 -
SkipLast
— 跳过后面的若干项数据 -
Take
— 只保留前面的若干项数据 -
TakeLast
— 只保留后面的若干项数据
组合操作
组合操作符用于将多个Observable组合成一个单一的Observable
-
And/Then/When
— 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集 -
CombineLatest
— 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果 -
Join
— 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射 -
Merge
— 将两个Observable发射的数据组合并成一个 -
StartWith
— 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项 -
Switch
— 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据 -
Zip
— 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射
错误处理
这些操作符用于从错误通知中恢复
辅助操作
一组用于处理Observable的操作符
-
Delay
— 延迟一段时间发射结果数据 -
Do
— 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作 -
Materialize/Dematerialize
— 将发射的数据和通知都当做数据发射,或者反过来 -
ObserveOn
— 指定观察者观察Observable的调度程序(工作线程) -
Serialize
— 强制Observable按次序发射数据并且功能是有效的 -
Subscribe
— 收到Observable发射的数据和通知后执行的操作 -
SubscribeOn
— 指定Observable应该在哪个调度程序上执行 -
TimeInterval
— 将一个Observable转换为发射两个数据之间所耗费时间的Observable -
Timeout
— 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知 -
Timestamp
— 给Observable发射的每个数据项添加一个时间戳 -
Using
— 创建一个只在Observable的生命周期内存在的一次性资源
条件和布尔操作
这些操作符可用于单个或多个数据项,也可用于Observable
-
All
— 判断Observable发射的所有的数据项是否都满足某个条件 -
Amb
— 给定多个Observable,只让第一个发射数据的Observable发射全部数据 -
Contains
— 判断Observable是否会发射一个指定的数据项 -
DefaultIfEmpty
— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 -
SequenceEqual
— 判断两个Observable是否按相同的数据序列 -
SkipUntil
— 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 -
SkipWhile
— 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据 -
TakeUntil
— 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知 -
TakeWhile
— 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
算术和聚合操作
这些操作符可用于整个数据序列
-
Average
— 计算Observable发射的数据序列的平均值,然后发射这个结果 -
Concat
— 不交错的连接多个Observable的数据 -
Count
— 计算Observable发射的数据个数,然后发射这个结果 -
Max
— 计算并发射数据序列的最大值 -
Min
— 计算并发射数据序列的最小值 -
Reduce
— 按顺序对数据序列的每一个应用某个函数,然后返回这个值 -
Sum
— 计算并发射数据序列的和
连接操作
一些有精确可控的订阅行为的特殊Observable
-
Connect
— 指示一个可连接的Observable开始发射数据给订阅者 -
Publish
— 将一个普通的Observable转换为可连接的 -
RefCount
— 使一个可连接的Observable表现得像一个普通的Observable -
Replay
— 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅
网友评论