本节提供有关SpringFramework5中Web应用程序的反应式编程支持的基本信息。
Reactive Streams 是一个标准或者说是规范,由 Netflix、TypeSafe、Pivotal 等公司发起的。
Reactor 是 Spring 中的一个子项目,一个基于 java 的响应式编程框架,该框架实现了 Reactive Programming(反应式编程即响应式编程) 思想,符合 Reactive Streams 规范。Mono 和 Flux 是 Reactor 中非常重要的两个类。
Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。
一、介绍
1.1 什么是反应式编程
简单地说,反应式编程是关于异步和事件驱动的非阻塞应用程序,需要少量线程来支持更多的请求。我们通过优先考虑纵向扩展,在JVM中性能提升到极致;而不优先考虑水平扩张,通过集群来支持高性能。
反应式应用的一个关键方面是反压的概念,它可以确保生产者不会拖垮消费者。例如,在从数据库到HTTP响应的反应式组件管道中,当HTTP连接速度太慢时,数据存储库也可以完全减速或停止,直到释放网络容量。
反应式编程也引领了从命令式到声明式异步组合逻辑的重大转变。它与编写阻塞代码相比,可以使用Java 8的CompletableFuture,通过lambda表达式组成后续动作。
要了解更多的介绍,请查看DaveSyer的博客系列“反应式编程笔记”。
例如:在一个管道中,针对数据库的反应式组件,提供为HTTP响应。当HTTP连接太慢时,数据存储库也会减慢速度或者完全停止,直到释放网络容量。
备注:
1.反应式编程框架主要采用了观察者模式,而SpringReactor的核心则是对观察者模式的一种衍伸。以上提到的生产者为被观察者,消费者为观察者;
2.反压机制:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
1.2 Reactive API和构建块
SpringFramework5将Reactive Streams 作为在异步组件和库之间通信反压契约。
Reactive Streams是通过行业协作创建的规范,它在Java 9中作为java.util.concurrent.Flow被采用。
Spring Framework 内部使用Reactor作为反应式支撑。Reactor是Reactive Streams的实现,它进一步扩展了与Flux和Mono可组合API类型的基本的 Reactive Streams Publisher 契约,以对0..n和0..1的数据序列提供声明性操作。
Spring框架在许多自己的反应式API中公开了Flux和Mono。然而,在应用级别,和往常一样,Spring提供了选择,并且完全支持RxJava的使用。
有关反应类型的更多信息,请查看Sebastien Deleuze的 "Understanding Reactive Types"文章。.
二、Spring WebFlux Module
SpringFramework5包括一个新的SpringWebFlux模块。该模块包含对反应式HTTP和WebSocket客户端的支持,以及对反应式服务器Web应用程序(包括REST、HTML浏览器和WebSocket风格的交互)的支持。
2.1 服务端
在服务器端,WebFlux支持两种不同的编程模型:
基于@controller的注解以及Spring MVC支持的其他注释;
Functional,Java 8 lambda风格routing和handling。
这两种编程模型都是在相同的反应式基础上执行的,它将非阻塞HTTP运行容器适应于Reactive Streams API。下图显示了服务器端栈,包括SpringWebMVC模块左侧基于Servlet的传统SpringMVC,以及SpringWebFlux模块右侧的反应式栈。
WebFlux可以在支持Servlet 3.1非阻塞IO API的Servlet容器上运行,也可以在Netty和Undertow等其他异步框架上运行。
每次运行时都适应于反应式ServerHTTPRequest和ServerHTTPResponse,将请求和响应的主体显示为Flux<dataBuffer>,而不是带有反应式反压的InputStream 和OutputStream。REST样式的JSON 和XML 序列化和反序列化作为Flux<Object>在顶部受支持,HTML 视图呈现和Server-Sent也受支持。
基于注解的编程模型:
WebFlux还支持相同的@Controller编程模型和SpringMVC中使用的注解。主要区别为基础核心、框架契约。例如:HandlerMapping,HandlerAdapter是非阻塞的,且在反应式的ServerHttpRequest和ServerHttpResponse,优于 HttpServletRequest 和 HttpServletResponse。以下为反应式controller示例:
函数模式模型:
HandlerFunctions
接收到的HTTP 请求被HandlerFunction执行。HandlerFunction实际上就是个接收ServerRequest,返回Mono<ServerResponse>的函数。HandlerFunction在方法上可以通过注解@RequestMapping来实现。
ServerRequest和ServerResponse是不可变的接口,提供对底层HTTP消息的JDK-8友好访问。在Reactor顶部构建完全的反应式:请求将主体公开为Flux或Mono;响应接受任何 Reactive Streams 发布者的主体。
ServerRequest 提供了访问各种HTTP元素,包括:请求方法、URI、请求参数,也可以通过单独的ServerRequest.Headers接口获取头信息。访问body可以通过body方法。例如,这是如何将从请求body中提取为Mono<string>:
Mono<String> string = request.bodyToMono(String.class);
以下示例中,将 body内容反序列化为Person实体的方法(前提是body包含JSON,则由Jackson支持Person;如果主体包含XML,则由JAXB支持Person)。
Flux<Person> people = request.bodyToFlux(Person.class);
上面的两种方法(BodyToMono和BodyToFlux)实际上是使用ServerRequest.Body(BodyExtractor)方法的方便方法。
BodyExtractor是一个函数式策略接口,允许您编写自己的提取逻辑,但是公共的BodyExtractor实例在BodyExtractors工具类中可以找到。因此,上述示例可以替换为:
Mono<String> string = request.body(BodyExtractors.toMono(String.class);
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class);
类似地,ServerResponse 提供访问HTTP response。因为它是不可变的,所以可以使用构建器创建一个ServerResponse。该构造器允许您设置响应状态、添加响应头和提供主体。例如:这是如何创建具有 200 OK 状态、JSON content-type和body的response:
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person);
下面是如何构建一个具有 201 Created 状态、Location header,和空body的响应:
URI location = ...
ServerResponse.created(location).build();
把这些放在一起可以创建一个HandlerFunction。例如:这里是一个简单的“Hello World”handler lambda示例,返回response状态为200,且返回为String类型:
正如我们上面所做的那样,将处理程序函数编写为lambda函数是方便的,但可能缺乏可读性,在处理多个函数时会变得不易维护。因此,建议将相关的处理程序函数分组为处理程序或控制器类。例如,这里有一个类,它公开了一个反应式Personrepository:
listPeople 是一个handler函数,返回了数据库中所有Person对象,格式为JSON。
createPerson是一个handler函数,存储一个新的Person对象,该Person对象从请求body中获取。请注意,PersonRepository.savePerson(Person) 返回Mono<Void>:一个空的Mono,用作发射person保存完成信号。因此我们使用build(Publisher<Void>)方法当接收到完成信号时,返回响应信息。
getPerson 是一个handler函数返回一条person信息,通过路径变量id标识。如果在数据库中找到,我们创建JSON响应;如果找不到,通过 otherwiseIfEmpty(Mono<T>) 返回404 Not Found响应。
RouterFunctions
接收的请求通过RouterFunction路由到具体处理函数,该路由函数接受ServerRequest,并返回Mono<HandlerFunction>。如果请求匹配上路由,将返回一个处理函数;否则将返回一个空的Mono。RouterFunction的用途与@controller类中的@requestmapping注解类似。
通常,你可以通过RouterFunctions.route(RequestPredicate, HandlerFunction) 来创建路由函数。如果断言匹配,请求将被路由到指定的处理函数;否则,无路由被执行,返回404 Not Found响应。
虽然您可以编写自己的RequestPredicate,但没有必要,因为RequestPredicates实用程序类提供了常用的谓词,例如基于路径、HTTP方法、content-type等进行匹配。如下示例中,使用route,我们可以路由到“hello world”处理程序函数:
RouterFunction helloWorldRoute =RouterFunctions.route(RequestPredicates.path("/hello-world"),request -> Response.ok().body(fromObject("Hello World")));
两个路由器函数可以组成一个新的路由器函数,路由到任一处理程序函数:如果第一个路由的谓词不匹配,则计算第二个。组合路由器函数按顺序进行匹配,因此将特定函数放在通用函数之前是有意义的。
您可以通过调用RouterFunction.and(routerFunction)或调用RouterFunction.andRoute(requestPredicate,handlerFunction)来组合两个路由器函数,这是RouterFunction.and()与RouterFunctions.route()的方便组合。
考虑到上面显示的PersonHandler,我们现在可以定义一个路由到各自处理程序函数的路由器函数。我们使用 method-references 来引用处理程序函数:
除了路由功能之外,您还可以通过调用RequestPredicate.and(RequestPredicate)或RequestPredicate.or(RequestPredicate)组成请求谓词。RequestPredicate.and(RequestPredicate)为两个条件都匹配为匹配;RequestPredicate.or(RequestPredicate)为有一个条件匹配则为匹配。在请求谓词中找到的大多数谓词都是复合谓词。例如,RequestPredicates.GET(String)是RequestPredicates.method(Httpmethod)和RequestPredicates.path(string)的组合。
public static RequestPredicate GET(String pattern) {
return method(HttpMethod.GET).and(path(pattern));
}
运行容器
现在只缺少一个难题:在HTTP服务器中运行一个路由器函数。可以使用RouterFunctions.toHttpHandler(RouterFunction)将路由器函数转换为HttpHandler。HttpHandler允许您在各种各样的反应式服务器上运行:Reactor Netty、RxNetty、Servlet 3.1+和Undertow。以下是我们如何在Reactor Netty上运行路由功能,例如:
对于Tomcat来说,情况如下:
TODO: DispatcherHandler
HandlerFilterFunction
路由函数映射的路由可以通过调用RouterFunction.filter(HandlerFilterFunction)进行筛选,其中HandlerFilterFunction本质上是一个接受ServerRequest和HandlerFunction并返回ServerResponse的函数。
handler函数参数表示链中的下一个元素:这通常是路由到的HandlerFunction ,但如果应用了多个筛选器,也可以是另一个FilterFunction。通过注解,可以使用@ControllerAdvice and/or ServletFilter实现类似的功能。让我们为路由添加一个简单的安全过滤器,假设我们有一个SecurityManager可以确定是否允许特定路径:
在这个示例中,您可以看到调用next.handle(ServerRequest)是可选的:我们在允许访问时执行handler函数。
2.2客户端
WebFlux包括一个函数式、反应式的WebClient,它提供了一个完全非阻塞和反应式的RestTemplate替代方案。它将网络输入和输出公开为反应式ClientHttpRequest 和ClientHttpResponse ,其中请求和响应的主体Flux<DataBuffer>而不是InputStream和OutputStream。此外,它支持与服务器端相同的反应式JSON、XML和SSE序列化机制,因此您可以使用类型化对象。
下面是一个使用WebClient的示例,它需要一个ClientHttpConnector实现来支持一个特定的HTTP客户端,如Reactor Netty:
AsyncRestTemplate也支持非阻塞交互。主要的区别在于它不能支持非阻塞流,比如Twitter one,因为从根本上说它仍然基于并依赖于InputStream 和 OutputStream。
2.3Request与Response体转换
spring-core模块提供了反应式Encoder和Decoder契约,可以对类型化对象与Flux字节流进行序列化转换。SpringWeb模块添加了JSON(Jackson)和XML(JAXB)实现,用于Web应用程序以及其他SSE流和zero-copy文件传输。
例如,请求主体可以是以下方式之一,它将在注解和函数式编程模型中自动解码:
Account account— account在被controller调用前,在非阻塞情况下,被反序列化。
Mono<Account> account—controller使用Mono声明逻辑,在account被反序列化后执行。
Single<Account> account—与Mono的使用一样,但在RxJava中使用。
Flux<Account> accounts— 输入流场景。
Observable<Account> accounts— 使用RxJava的输入流。
响应主体可以是以下内容之一:
Mono<Account>— 当Mono完成时,在不阻塞给定Account的情况下进行序列化。
Single<Account>— 与Mono的使用一样,但在RxJava中使用。
Flux<Account>— 流场景,可能是SSE,具体取决于请求的内容类型。
Observable<Account>—使用RxJava Observable类型的流场景。
Flowable<Account>—使用RxJava 2流类型。
Flux<ServerSentEvent>— SSE 流。
Mono<Void>— 当Mono完成时,请求处理完成。
Account—在不阻塞给定Account的情况下进行序列化;表示同步的、非阻塞的controller方法。
void—特定于基于注解的编程模型,请求处理在方法返回时完成;表示一个同步的、非阻塞的controller方法。
当使用流类型(如Flux或Observable)时, request/response或mapping/routing级别中指定的媒体类型用于确定应如何序列化和刷新数据。例如,默认情况下,返回Flux<User>的REST端将按以下方式序列化:
application/json:Flux<User>作为异步集合处理, 在发出完成事件时,被序列化为jJSON数组(具有显式刷新)。
application/stream+json: Flux<User> 将作为一个用户元素流处理,这些用户元素被序列化为单独的JSON对象,用新行分隔,并在每个元素之后显式刷新。WebClient支持JSON流解码,因此这是服务器到服务器用例的一个很好的用例。
text/event-stream:Flux<User>或Flux<ServerSentEvent<User>> 将作为用户或ServerSentEvent 元素流处理,这些元素在默认情况下被序列化为单个SSE元素,在每个元素之后使用JSON 进行数据编码和显式刷新。这非常适合向浏览器客户端公开流。WebClient也支持读取SSE流。
2.4 反应式WebSocket 支持
WebFlux包括反应式WebSocket客户端和服务器支持。客户端和服务器都支持Java WebSocket API(JSR-356)、Jetty、UntToW、NeLTR NETY和RxNetty。
在服务器端,声明为WebSocketHandlerAdapter,然后简单地添加到基于WebSocketHandler端点的映射:
在客户端,为上面列出的受支持库之一创建WebSocketClient:
2.5 测试
spring-test模块包括一个WebTestClient,它可以用来测试WebFlux服务器端点是否有运行的服务器。没有运行服务器的测试可以与SpringMVC中的MockMVC相媲美,在这里使用模拟请求和响应,而不是使用socket进行网络连接。但是,WebTestClient也可以对正在运行的服务器执行测试。
有关更多信息,请参见框架中的示例测试。
三、 准备开始
3.1Spring Boot Starter
通过http://start.spring.io提供的Spring Boot Web Reactive Starter是最快的入门方法。它完成了所有必要的工作,所以您可以像编写SpringMVC那样开始编写@controller类。只需转到http://start.spring.io,选择2.0.0.build-snapshot,然后在Dependencies框中键入reactive。默认情况下starter使用Tomcat运行,但是依赖项可以像往常一样通过Spring boot更改,以切换到不同的运行服务器。有关更多详细信息和说明,请参阅起始页。
此starter 还支持函数式Web API,并将自动检测RouterFunction类型的bean。您的Spring Boot WebFlux应用程序应该使用RouterFunction或RequestMapping方法,目前不可能在同一个应用程序中混合使用它们。
3.2手动引导
本节概述了手动启动和运行的步骤。
对于依赖项,从spring-webflux和spring-context开始。然后添加jackson-databind和io.netty:netty-buffer(暂时参见 SPR-14528)以获得JSON支持。最后,为支持的某个运行时添加依赖项:
Tomcat —org.apache.tomcat.embed:tomcat-embed-core
Jetty —org.eclipse.jetty:jetty-server 和 org.eclipse.jetty:jetty-servlet
Reactor Netty —io.projectreactor.ipc:reactor-netty
RxNetty —io.reactivex:rxnetty-common 和 io.reactivex:rxnetty-http
Undertow —io.undertow:undertow-core
对于基于注解的编程模型引导:
上面加载了默认的Spring Web框架配置(1),然后创建了一个DispatcherHandler,即驱动请求处理的主类(2),并将其适配到HTTPHandler——反应式HTTP请求处理的最低级别的Spring抽象。
对于函数式编程模型引导,如下所示:
上面创建了一个AnnotationConfigApplicationContext 实例(1),它可以利用新的函数式bean注册API(2)来使用Java 8 Supplier注册bean,或者仅仅通过指定它的类(3)来注册bean。HttpHandler 是使用WebHttpHandlerBuilder(4)创建的。
然后,可以在支持的运行环境之一中安装HttpHandler:
对于Servlet 容器,特别是对于WAR 部署,您可以使用AbstractAnnotationConfigDispatcherHandlerInitializer ,它作为WebApplicationInitializer 由Servlet 容器自动检测。它负责注册ServletHttpHandlerAdapter,如上图所示。为了指向您的Spring配置,您需要实现一个抽象方法。
3.3 示例
在以下项目中,您将发现代码示例对于构建反应式Web应用程序很有用:
Spring Boot Web Reactive Starter: 反应式Starter可以通过 http://start.spring.io 生成。
Spring Reactive Playground: 大多数Spring Web响应式特性的展示。
Reactor website: spring-functional分支是一个Spring 5功能,Java 8 lambda风格的应用程序
Spring Reactive University session: live-coded 工程来自 this Devoxx BE 2106 university talk
Mix-it 2017 website: Kotlin + Reactive + Functional web and bean registration API application
Reactor by example: 来自此的代码段 InfoQ article
Spring integration tests: 用Reactor测试的各种特性 StepVerifier
与Flux和Mono可组合API类型的
网友评论