都是google翻译,虽然有些地方翻译的让人费解。但是我也没有使用我的理解。怕翻译的不好,误导别人。 不懂的地方,自己领悟吧!
Hello World
二是编写Hello World程序:
package rxjava.examples;
import io.reactivex.rxjava3.core.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
请注意,RxJava 3 组件现在io.reactivex.rxjava3
位于io.reactivex.rxjava3.core
.
Base classes
RxJava 3 具有几个基类,您可以在其上发现运算符:
-
io.reactivex.rxjava3.core.Flowable
: 0..N 流,支持 Reactive-Streams 和背压 -
io.reactivex.rxjava3.core.Observable
: 0..N 流量,无背压, -
io.reactivex.rxjava3.core.Single
: a 恰好 1 个项目的流或错误, -
io.reactivex.rxjava3.core.Completable
: 没有项目但只有完成或错误信号的流程, -
io.reactivex.rxjava3.core.Maybe
: a 一个没有项目的流程,只有一个项目或一个错误。
Some terminology
Upstream, downstream
RxJava 中的数据流由一个源、零个或多个中间步骤组成,然后是数据使用者或组合器步骤(该步骤负责以某种方式消费数据流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在这里,如果我们想象自己在 上operator2
,向左看源头被称为上游。向右看订阅者/消费者被称为下游。当每个元素都写在单独的行上时,这通常更明显:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
Objects in motion
在 RxJava 的文档中,emission
、emits
、item
、event
、signal
、data
和message
被认为是同义词,代表沿着数据流传播的对象。
Backpressure
当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的事情。为了避免压倒这些步骤,这通常表现为由于临时缓冲或需要跳过/删除数据而增加的内存使用量,应用了所谓的背压,这是一种流程控制形式,其中步骤可以表达多少项目他们准备好处理了吗?这允许在步骤通常无法知道上游将向其发送多少项的情况下限制数据流的内存使用。
在 RxJava 中,专用Flowable
类用于支持背压,Observable
专用于非背压操作(短序列、GUI 交互等)。其他类型Single
,Maybe
并且Completable
不支持背压也不应; 总是有空间临时存放一件物品。
Assembly time
通过应用各种中间运算符来准备数据流发生在所谓的assembly time:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
此时,数据还没有流动,也没有发生任何副作用。
Subscription time
这是在subscribe()
内部建立处理步骤链的流程上调用时的临时状态:
flow.subscribe(System.out::println)
这是触发订阅副作用的时间(请参阅 参考资料doOnSubscribe
)。在这种状态下,某些来源会立即阻止或开始发射物品。
Runtime
这是流主动发出项目、错误或完成信号时的状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
实际上,这是上面给定示例的主体执行的时间。
Simple background computation
RxJava 的常见用例之一是在后台线程上运行一些计算、网络请求,并在 UI 线程上显示结果(或错误):
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
这种链接方法的风格称为流畅的 API,类似于构建器模式。然而,RxJava 的响应式类型是不可变的;每个方法调用都会返回一个Flowable
具有添加行为的新方法。为了说明,这个例子可以改写如下:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,您可以通过 将计算或阻塞 IO 移动到某个其他线程subscribeOn
。数据准备好后,您可以确保它们在前台或 GUI 线程上通过observeOn
.
Schedulers
RxJava 运算符不直接与Threads
或ExecutorServices
一起工作,而是与所谓的Schedulers
一起工作,这些s抽象了统一 API 背后的并发源。RxJava 3 具有多个可通过Schedulers
实用程序类访问的标准调度程序。
-
Schedulers.computation()
: 在后台在固定数量的专用线程上运行计算密集型工作。大多数异步运算符使用它作为它们的默认值Scheduler
。 -
Schedulers.io()
:在一组动态变化的线程上运行类似 I/O 或阻塞的操作。 -
Schedulers.single()
: 以顺序和先进先出的方式在单个线程上运行工作。 -
Schedulers.trampoline()
: 在其中一个参与线程中以顺序和 FIFO 方式运行工作,通常用于测试目的。
这些在所有 JVM 平台上都可用,但某些特定平台(例如 Android)Schedulers
定义了自己的典型:AndroidSchedulers.mainThread()
, SwingScheduler.instance()
or JavaFXSchedulers.gui()
.
此外,还有一个选项可以将现有Executor
(及其子类型,例如ExecutorService)包装到Schedulervia
中Schedulers.from(Executor)
。例如,这可以用于拥有更大但仍然固定的线程池(分别不同于computation()
和io()
)。
最后Thread.sleep(2000)
;的结局绝非偶然。在 RxJava 中,默认Schedulers
在守护线程上运行,这意味着一旦 Java 主线程退出,它们都会停止并且后台计算可能永远不会发生。在此示例情况下休眠一段时间可以让您在有空闲时间的情况下在控制台上看到流的输出。
Concurrency within a flow
RxJava 中的流程本质上是顺序的,分为可以相互并发运行的处理阶段:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
此示例流程在计算中 对从 1 到 10 的数字进行平方,Scheduler
并在“主”线程(更准确地说,是的调用者线程blockingSubscribe
)上使用结果。但是,v -> v * v
对于此流程,lambda不会并行运行;它在同一个计算线程上一个接一个地接收值 1 到 10。
Parallel processing
并行处理数字 1 到 10 有点复杂:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
实际上,RxJava 中的并行意味着运行独立的流程并将它们的结果合并回单个流程。运算符flatMap首先将 1 到 10 的每个数字映射到它自己的个体Flowable,运行它们并合并计算出的平方。
但是请注意,这flatMap并不能保证任何顺序,并且来自内部流的项目最终可能会交错。有替代运算符:
-
concatMap
一次映射并运行一个内部流 -
concatMapEager
它“一次”运行所有内部流,但输出流将按照创建这些内部流的顺序。
或者,Flowable.parallel()
运算符和ParallelFlowable
类型有助于实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Dependent sub-flows
flatMap
是一个强大的运算符,在很多情况下都有帮助。例如,给定一个返回 a 的服务Flowable
,我们想使用第一个服务发出的值调用另一个服务:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
Continuations
有时,当一项可用时,人们希望对其执行一些相关计算。这有时称为延续,根据应该发生的情况和涉及的类型,可能涉及各种操作符来完成。
Dependent
最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
后面的序列也经常需要来自早期映射的值。这可以通过将外部移动flatMap
到前面的内部部分来实现flatMap
,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
在这里,原始文件value
将在内部可用flatMap
,由 lambda 变量捕获提供。
Non-dependent
在其他情况下,第一个源/数据流的结果无关紧要,人们希望继续使用准独立的另一个源。在这里,flatMap
也有效:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
但是,在这种情况下,继续保留Observable
而不是可能更合适的Single
. (这是可以理解的,因为从 的角度来看flatMapSingle
,sourceObservable
是一个多值源,因此映射也可能导致多个值)。
通常,虽然有一种方式更具表现力(并且开销也更低),通过使用Completable
作为中介及其操作符andThen
来恢复其他东西:
sourceObservable
.ignoreElements() // returns Completable 忽略所有源Observable产生的结果,只会执行onCpmpleted()或者onError()方法
.andThen(someSingleSource)//然后
.map(v -> v.toString())
sourceObservable
和 someSingleSource
之间的唯一区别是前者应该正常完成以便被后者消耗。
Deferred-dependent
有时,前一个序列和新序列之间存在隐含的数据依赖关系,由于某种原因,它没有流经“常规通道”。人们倾向于将这样的延续写成如下:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,这会打印,0
因为在数据流尚未运行时Single.just(count.get())
在组装时进行评估。我们需要一些东西来推迟对这个Single
源的评估,直到主源完成的运行时:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
or
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Type conversions
有时,源或服务返回的类型与应该使用它的流不同。例如,在上面的清单示例中,getDemandAsync
可以返回Single<DemandRecord>
. 如果代码示例保持不变,这将导致编译时错误(但是,通常带有关于缺少重载的误导性错误消息)。
在这种情况下,通常有两个选项可以修复转换:1) 转换为所需类型或 2) 查找并使用支持不同类型的特定运算符的重载。
Converting to the desired type
每个反应式基类都具有可以执行此类转换(包括协议转换)以匹配其他类型的运算符。以下矩阵显示了可用的转换选项:
Flowable | Observable | Single | Maybe | Completable | |
---|---|---|---|---|---|
Flowable | toObservable |
first , firstOrError , single , singleOrError , last , lastOrError 1
|
firstElement , singleElement , lastElement
|
ignoreElements |
|
Observable |
toFlowable 2
|
first , firstOrError , single , singleOrError , last , lastOrError 1
|
firstElement , singleElement , lastElement
|
ignoreElements |
|
Single |
toFlowable 3
|
toObservable |
toMaybe |
ignoreElement |
|
Maybe |
toFlowable 3
|
toObservable |
toSingle |
ignoreElement |
|
Completable | toFlowable |
toObservable |
toSingle |
toMaybe |
1: 将多值源转换为单值源时,应决定应将众多源值中的哪一个视为结果。
2:将 Observable
变成Flowable
需要额外的决定:如何处理源的潜在无约束流Observable
?有几种策略可用(例如缓冲、丢弃、保持最新)通过BackpressureStrategy
参数或通过标准Flowable运算符,例如onBackpressureBuffer
, onBackpressureDrop
,onBackpressureLatest
这也允许进一步自定义背压行为。
3: 当只有(最多)一个源项时,背压没有问题,因为它可以一直存储,直到下游准备好消费。
Using an overload with the desired type
许多经常使用的运算符都有可以处理其他类型的重载。这些通常以目标类型的后缀命名:
Operator | Overloads |
---|---|
flatMap |
flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable
|
concatMap |
concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable
|
switchMap |
switchMapSingle , switchMapMaybe , switchMapCompletable
|
这些运算符具有后缀而不是简单地具有不同签名的相同名称的原因是类型擦除。Java 不考虑诸如 operator(Function<T, Single<R>>)
和 operator(Function<T, Maybe<R>>)
不同的签名(与 C# 不同),并且由于擦除,这两个operators
最终将成为具有相同签名的重复方法。
Operator naming conventions
编程中的命名是最困难的事情之一,因为名称应该不长、不具表现力、易于理解且易于记忆。不幸的是,目标语言(和预先存在的约定)在这方面可能不会提供太多帮助(无法使用的关键字、类型擦除、类型歧义等)。
Unusable keywords
在原始的 Rx.NET 中,发出单个项目然后完成的运算符称为Return(T)
。由于 Java 约定是以小写字母开头的方法名称,这本来return(T)
是 Java 中的关键字,因此不可用。因此,RxJava 选择将这个操作符命名为just(T)
。Switch对于必须命名的操作员也存在同样的限制switchOnNext
。另一个例子是Catch名为onErrorResumeNext
.
Type erasure
许多期望用户提供一些返回响应式类型的函数的运算符无法重载,因为围绕 a 的类型擦除Function<T, X>
会将此类方法签名变成重复项。RxJava 选择通过附加类型作为后缀来命名这些运算符:
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Type ambiguities
即使某些操作符在类型擦除方面没有问题,但它们的签名可能会变得不明确,尤其是在使用 Java 8 和 lambdas 时。例如,有几种concatWith
将各种其他反应式基类型作为参数的重载(为了在底层实现中提供便利和性能优势):
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
两者Publisher
和SingleSource
显示为功能接口(类型的具有一个抽象方法),并可以鼓励用户尝试提供lambda表达式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
不幸的是,这种方法不起作用,并且该示例根本无法打印2
。事实上,从 2.1.10 版本开始,它甚至无法编译,因为至少concatWith
存在4 个重载并且编译器发现上面的代码不明确。
在这种情况下,用户可能想要推迟一些计算直到someSource
完成,因此正确的明确运算符应该是defer
:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有时,添加后缀是为了避免可能编译但在流中产生错误类型的逻辑歧义:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
当函数接口类型作为类型参数涉及时,这也可能变得不明确 T
.
Error handling
数据流可能会失败,此时错误会发送给消费者。但有时,多个源可能会失败,此时可以选择是等待所有源完成还是失败。为了表明这个机会,许多运算符名称都以DelayError
单词为后缀(而其他运算符在其重载之一中具有delayError
或delayErrors
布尔标志):
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
当然,各种后缀可能会一起出现:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type
由于基类上的静态方法和实例方法数量众多,因此可以认为它们很重。RxJava 3 的设计深受Reactive Streams规范的影响,因此,该库为每个反应类型提供了一个类和一个接口:
Type | Class | Interface | Consumer |
---|---|---|---|
0..N backpressured | Flowable |
Publisher 1
|
Subscriber |
0..N unbounded | Observable |
ObservableSource 2
|
Observer |
1 element or error | Single |
SingleSource |
SingleObserver |
0..1 element or error | Maybe |
MaybeSource |
MaybeObserver |
0 element or error | Completable |
CompletableSource |
CompletableObserver |
1 org.reactivestreams.Publisher
是外部反应流库的一部分。它是通过由Reactive Streams 规范管理的标准化机制与其他反应库交互的主要类型。
2接口的命名约定是附加Source
到半传统的类名。没有FlowableSource
因为PublisherReactive Streams
库提供了(并且子类型化它也不会帮助互操作)。然而,这些接口不是 Reactive Streams 规范意义上的标准,目前仅是 RxJava 特定的。
R8 and ProGuard settings
默认情况下,RxJava 本身不需要任何 ProGuard/R8 设置,应该可以正常工作。不幸的是,自 1.0.3 版以来的 Reactive Streams 依赖项在其 JAR 中嵌入了 Java 9 类文件,这可能会导致普通 ProGuard 出现警告:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
建议-dontwarn
在应用程序proguard-ruleset
文件中设置以下条目:
-dontwarn java.util.concurrent.Flow*
对于R8,RxJava jar包含META-INF/proguard/rxjava3.pro
,其中包含相同的no warning子句,应该自动应用。
网友评论