原文地址:https://projectreactor.io/docs/core/release/reference/index.html#intro-reactive
- Introduction to Reactive Programming
反应式编程简介
Reactor是Reactive Programming范式的实现,可以总结如下:
反应式编程是一种异步编程范式,它涉及数据流和变化的传播。 这意味着可以通过采用的编程语言轻松地表达静态(例如数组)
或动态(例如事件发射器)数据流。
https://en.wikipedia.org/wiki/Reactive_programming
作为响应式编程方向上的第一步,Microsoft在.NET生态系统中创建了响应式扩展(Rx)库。 然后RxJava在JVM上实现了反应式编程。
随着时间的流逝,通过Reactive Streams的努力出现了Java的规范,该规范定义了JVM上的响应库的一组接口和交互规则。
它的接口已在Flow类下集成到Java 9中。
反应式编程范例通常以面向对象的语言表示,作为Observer设计模式的扩展。 您还可以将主要的反应流模式与熟悉的Iterator设计模式进行比较,
因为所有这些库中的Iterable-Iterator对都有双重性。 一个主要的区别是,虽然Iterator是基于pull的,但是反应流却是基于push的。
使用迭代器是命令式编程模式,即使访问值的方法仅由Iterable负责。 确实,开发人员可以选择何时访问序列中的next()项。
在反应式流中,上述对等效于Publisher-Subscriber。 但是是发布者在新可用值出现时通知订阅者,而此推送方面是做出反应的关键。
同样,应用于推入值的操作以声明方式而不是命令方式表示:程序员表示计算的逻辑,而不是描述其确切的控制流程。
除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。 发布者可以将新值推送到其订阅者(通过调用onNext),
但也可以发出错误信号(通过调用onError)或完成(通过调用onComplete)。 错误和完成都会终止序列。
可以总结如下:
onNext x 0..N [onError | onComplete]
这种方法非常灵活。 该模式支持没有值,一个值或n个值(包括无限个值序列,例如时钟的连续滴答声)的用例。
但是为什么我们首先需要这样的异步反应式库?
- Blocking Can Be Wasteful
现代应用程序可以吸引大量的并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是关键问题。
广义上讲,有两种方法可以提高程序的性能:
并行使用更多线程和更多硬件资源
在使用现有资源方面寻求更高的效率
通常,Java开发人员通过使用阻塞代码来编写程序。 除非存在性能瓶颈,否则这种做法很好。 然后是时候引入其他线程,运行类似的阻塞代码了。
但是这种资源利用的扩展会迅速引入竞争和并发问题。
更糟糕的是,阻塞会浪费资源。 如果仔细观察,程序一旦遇到一些延迟(特别是I / O,例如数据库请求或网络调用),就会浪费资源,
因为线程(可能有很多线程)现在处于空闲状态,等待数据。
因此,并行化方法不是灵丹妙药。 有必要访问硬件的全部功能,但是推理和资源浪费也很复杂。
2.Asynchronicity to the Rescue?
前面提到的第二种方法,寻求更高的效率,可以解决资源浪费的问题。 通过编写异步的非阻塞代码,
您可以将执行切换到使用相同基础资源的另一个活动任务,并在异步处理完成后返回到当前进程。
但是如何在JVM上生成异步代码? Java提供了两种异步编程模型:
Callbacks:异步方法没有返回值,但是带有一个额外的回调参数(lambda或匿名类),该参数在结果可用时被调用。
一个著名的例子是Swing的EventListener层次结构。
Futures:异步方法立即返回Future <T>。 异步过程计算T值,但是Future对象包装对其的访问。 该值不是立即可用的,
并且可以轮询该对象,直到该值可用为止。 例如,运行Callable <T>任务的ExecutorService使用Future对象。
这些技术够好吗? 并非针对每个用例,这两种方法都有局限性。
回调很难组合在一起,迅速导致难以阅读和维护的代码(称为“回调地狱”)。
考虑一个示例:在用户界面上显示用户的前五个收藏夹,如果没有收藏夹则显示建议。
这需要三项服务(一项提供喜欢的ID,第二项获取喜欢的详细信息,第三项提供带有详细信息的建议),
如下所示:
例子5.回调地狱的例子
userService.getFavorites(userId, new Callback<List<String>>() { //1
public void onSuccess(List<String> list) { //2
if (list.isEmpty()) { //3
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) { //4
UiUtils.submitOnUiThread(() -> { //5
list.stream()
.limit(5)
.forEach(uiList::show); //6
});
}
public void onError(Throwable error) { //7
UiUtils.errorPopup(error);
}
});
} else {
list.stream() //8
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, //9
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
1.我们有基于回调的服务:一种回调接口,其中的一种方法在异步过程成功时被调用,而在发生错误时被调用。
2.第一个服务使用收藏夹ID列表调用其回调。
3.如果该列表为空,则必须转到RecommendationionService。
4.RecommendationionService将List <Favorite>提供给第二个回调。
5.由于我们处理的是UI,因此我们需要确保使用的代码在UI线程中运行。
6.我们使用Java 8 Stream将处理的建议数限制为五个,并在UI的图形列表中显示它们。
7.在每个级别,我们以相同的方式处理错误:在弹出窗口中显示它们。
8.返回收藏夹ID级别。 如果该服务返回了完整列表,则需要转到favoriteService获取详细的Favorite对象。
由于我们只需要五个,因此我们首先传输ID列表以将其限制为五个。
9.再一次,回调。 这次,我们得到了一个完整的“收藏夹”对象,该对象被推送到UI线程中的UI。
那是很多代码,很难遵循并且包含重复的部分。 考虑它在Reactor中的等效功能:
示例6.与回调代码等效的Reactor代码示例:
userService.getFavorites(userId) //1
.flatMap(favoriteService::getDetails) //2
.switchIfEmpty(suggestionService.getSuggestions()) //3
.take(5) //4
.publishOn(UiUtils.uiThreadScheduler()) //5
.subscribe(uiList::show, UiUtils::errorPopup); //6
1.我们从收藏夹ID的流开始。
2.我们将它们异步转换为详细的收藏夹对象(flatMap)。 现在,我们有一个收藏夹流。
3.如果“收藏夹”的流为空,则可以通过RecommendationionService切换到后备。
4.我们最多只对结果流中的五个元素感兴趣。
5.最后,我们要处理UI线程中的每个数据。
6.我们通过描述如何处理数据的最终形式(在UI列表中显示)以及发生错误的情况(显示弹出窗口)来触发流程。
如果要确保在少于800毫秒的时间内检索喜欢的ID,或者如果花费更长的时间从缓存中获取它们,该怎么办?
在基于回调的代码中,这是一项复杂的任务。 在Reactor中,就像在链中添加超时操作符一样容易,如下所示:
例子7.具有超时和回退的Reactor代码的例子
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) //1
.onErrorResume(cacheService.cachedFavoritesFor(userId)) //2
.flatMap(favoriteService::getDetails) //3
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
1.如果以上部分的发射时间超过800ms,则传播错误
2.如果发生错误,请退回到cacheService。
3.链的其余部分与前面的示例相似。
Future比回调要好一些,但是尽管CompletableFuture对Java 8进行了改进,但它们在组合方面仍然表现不佳。
一起编排多个Future对象是可行的,但并不容易。 另外,未来还有其他问题:
通过调用get()方法很容易以Future对象结束另一个阻塞情况。
它们不支持惰性计算。
他们缺乏对多个值和高级错误处理的支持。
再看一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,并将它们成对组合,所有这些信息都是异步的。
下面的示例使用CompletableFuture类型的列表来执行此操作
Example 8. Example of CompletableFuture combination
CompletableFuture<List<String>> ids = ifhIds(); //1
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { //2
Stream<CompletableFuture<String>> zip = //3
l.stream().map(i -> {
CompletableFuture<S tring> n ameTask = ifhName(i); //4
CompletableFuture<Integer> statTask = ifhStat(i); //5
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); //6
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); //7
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); //8
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join) //9
.collect(Collectors.toList()));
});
List<String> results = result.join(); //10
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1.我们从一个可以为我们提供ID值列表的future开始。
2.一旦获得列表,我们想开始更深层次的异步处理。
3.对于列表中的每个元素:
4.异步获取关联名称。
5.异步获取关联的任务。
6.合并两个结果。
7.现在,我们有了代表所有组合任务的期货清单。 要执行这些任务,我们需要将列表转换为数组。
8.将数组传递给CompletableFuture.allOf,该数组将输出一个Future,当所有任务完成时,该Future将完成。
9.棘手的一点是allOf返回CompletableFuture <Void>,因此我们在future列表上重申,使用join()收集其结果(此处不会阻塞,因为allOf确保future全部完成了)。
10.一旦整个异步管道被触发,我们等待它被处理并返回可以声明的结果列表。
由于Reactor提供了更多组合运算符,因此可以简化此过程,如下所示:
例9.与以后的代码等效的Reactor代码的示例
Flux<String> ids = ifhrIds(); //1
Flux<String> combinations =
ids.flatMap(id -> { //2
Mono<String> nameTask = ifhrName(id); //3
Mono<Integer> statTask = ifhrStat(id); //4
return nameTask.zipWith(statTask, //5
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList(); //6
List<String> results = result.block(); //7
assertThat(results).containsExactly( //8
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1.这次,我们从异步提供的ID序列(Flux <String>)开始。
2.对于序列中的每个元素,我们都会异步处理它两次(在函数flatMap调用内部)。
3.获取关联的名称。
4.获取相关的统计信息。
5.异步组合两个值。
6.在值可用时将其汇总到一个列表中。
7.在生产中,我们将通过进一步组合或订阅它来继续异步使用Flux。 最有可能的是,我们将返回结果Mono。 由于我们正在测试中,
因此我们改为阻塞,等待处理完成,然后直接返回汇总的值列表。
8.断言结果。
使用回调和Future对象的风险是相似的,这是发布者对订阅者对响应式编程的解决。
3.From Imperative to Reactive Programming(从命令式编程到反应式编程)
反应性库(例如Reactor)旨在解决JVM上“经典”异步方法的这些缺点,同时还着重于其他一些方面:
1.可组合性和可读性
2.以丰富的运算符词汇操纵数据流
3.订阅之前没有任何反应
4.背压或消费者向生产者发出排放速率过高信号的能力
5.High level but high value abstraction that is concurrency-agnostic(并发不可知的高级但高价值的抽象)
3.1 Composability and Readability(可组合性和可读性)
所谓“可组合性”,是指协调多个异步任务的能力,其中我们使用先前任务的结果将输入反馈给后续任务。
另外,我们可以以fork-join样式运行多个任务。 此外,我们可以将异步任务重用为更高级别系统中的离散组件。
编排任务的能力与代码的可读性和可维护性紧密相关。 随着异步处理层的数量和复杂性的增加,能够编写和读取代码变得越来越困难。
如我们所见,回调模型很简单,但是它的主要缺点之一是,对于复杂的流程,您需要从一个回调中执行一个回调,该回调本身嵌套在另一个回调中,依此类推。
这种混乱被称为“回调地狱”。 您可以猜测(或从经验中学到),很难找到这样的代码并进行推理。
Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且通常将所有内容保持在同一级别(将嵌套最小化)。
3.2 The Assembly Line Analogy 流水线类比
You can think of data processed by a reactive application as moving through an assembly line.
Reactor is both the conveyor belt and the workstations. The raw material pours from a source (the original Publisher)
and ends up as a finished product ready to be pushed to the consumer (or Subscriber).
您可以将反应式应用程序处理的数据视为流水线。 Reactor既是传送带又是工作站。 原材料从来源(原始发布者)倾泻而出,最终成为准备好推向消费者(或订阅者)的成品。
The raw material can go through various transformations and other intermediary steps
or be part of a larger assembly line that aggregates intermediate pieces together.
If there is a glitch or clogging at one point (perhaps boxing the products takes a disproportionately long time),
the afflicted workstation can signal upstream to limit the flow of raw material.
原材料可以经过各种转换和其他中间步骤,也可以成为将中间件聚集在一起的较大装配线的一部分。 如果某一点出现故障或堵塞(也许装箱产品花费的时间过长),
那么受灾的工作站可以向上游发出信号,以限制原材料的流动。
3.3 Operators 运算符
In Reactor, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher
and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked,
such that data originates from the first Publisher and moves down the chain, transformed by each link.
Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher,
as we see shortly.
在Reactor中,operators是我们装配类比中的工作站。 每个operators都会向Publisher添加行为,并将上一步的发布服务器包装到新实例中。
因此,整个链被链接在一起,这样数据就从第一个Publisher发出并在链中向下移动,并由每个链接转换。 最终,订阅者完成了该过程。
请记住,在订阅者订阅发布者之前,什么也不会发生,正如我们不久将看到的那样。
Understanding that operators create new instances can help you avoid a common mistake that would lead you to believe
that an operator you used in your chain is not being applied. See this item in the FAQ.
了解operators会创建新的实例可以帮助您避免一个常见错误,该错误会导致您认为未应用您在链中使用的运算符。 请参阅常见问题解答中的此项。
While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries,
such as Reactor, is the rich vocabulary of operators that they provide.
These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.
虽然反应式流规范根本没有指定运算符,但是反应式库的最佳附加值之一(例如Reactor)是它们提供的运算符的丰富词汇表。 从简单的转换和过滤到复杂的编排和错误处理,这些内容涉及很多领域。
3.4 Nothing Happens Until You subscribe()
In Reactor, when you write a Publisher chain, data does not start pumping into it by default. Instead, you create an abstract description
of your asynchronous process (which can help with reusability and composition).
在Reactor中,当您编写发布者链时,默认情况下不会开始将数据泵入其中。 相反,您可以创建异步过程的抽象描述(这有助于重用和组合)。
By the act of subscribing, you tie the Publisher to a Subscriber, which triggers the flow of data in the whole chain.
This is achieved internally by a single request signal from the Subscriber that is propagated upstream,
all the way back to the source Publisher.
通过订阅,您将发布者与订阅者绑定在一起,从而触发了整个链中的数据流。 这是通过来自订阅服务器的单个请求信号在内部实现的,该请求信号向上游传播,一直返回到源发布服务器。
3.5 Backpressure
Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy
as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.
上游传播的信号也用于实现背压,我们在组装流水线中将其描述为当工作站的处理速度比上游工作站慢时,沿生产线向上发送的反馈信号。
The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber
can work in unbounded mode and let the source push all the data at its fastest achievable rate or
it can use the request mechanism to signal the source that it is ready to process at most n elements.
Reactive Streams规范定义的实际机制与类推非常接近:订户可以以无界模式工作,并让源以最快可达到的速率推送所有数据,
或者可以使用请求机制向源发出信号,告知它 准备处理最多n个元素。
Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten.
If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies,
which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.
中间的operators还可以在途中更改请求。 想象一下一个缓冲运算符,它将元素以十个为一组进行分组。 如果subscriber请求一个缓冲区,则源产生十个元素是可以接受的。
一些运算符还实现了预取策略,避免了request(1)往返,如果在请求之前生成元素的成本不太高的话,这将是有益的。
This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available.
But if the elements are not ready,they get pushed by the upstream whenever they are produced.
这将推模型转换为推挽混合模型,如果容易获得,则下游可以从上游拉取n个元素。 但是,如果元素尚未准备就绪,则每当它们被生产时就会被上游推送。
3.6 Hot vs Cold
The Rx family of reactive libraries distinguishes two broad categories of reactive sequences: hot and cold.
This distinction mainly has to do with how the reactive stream reacts to subscribers:
Rx反应库的家族将反应序列分为两大类:热和冷。 这种区别主要与反应流对订阅者的反应有关:
A Cold sequence starts anew for each Subscriber, including at the source of data. For example,
if the source wraps an HTTP call, a new HTTP request is made for each subscription.
对于每个订阅者,包括在数据源处,冷序列都会重新开始。 例如,如果源包装了一个HTTP调用,则会为每个订阅发出一个新的HTTP请求。
A Hot sequence does not start from scratch for each Subscriber. Rather, late subscribers receive
signals emitted after they subscribed. Note, however, that some hot reactive streams can cache or
replay the history of emissions totally or partially. From a general perspective,
a hot sequence can even emit when no subscriber is listening (an exception to the “nothing happens before you subscribe” rule).
并非每个订阅者都可以从头开始热序列。 相反,后订阅的订阅者会在订阅后接收发射的信号。 但是请注意,某些热反应流可以全部或部分缓存或重放排放历史。
从一般的角度来看,即使没有订阅者在收听,热序列甚至会发出(“订阅之前什么也没有发生”规则的例外)。
For more information on hot vs cold in the context of Reactor, see this reactor-specific section.
网友评论