1 背景:
后端基础架构,在业务层上面通常还有一个是应用层MIMP(业务网关),用来做用户权限校验(cookie 校验),C端接口数据聚合等,由于所有C端业务都要经过应用层,所以应用层是个典型的IO密集型的场景。随着业务发展,流量越来越大,对应用层容器节点要求越来越高,从4核8G 8节点扩展到 8核 16G 32节点,MIMP公共转发(业务网关)是引用http同步调用,性能较差,需要改造优化,把转发以及调用异步化,从而引入了 WebClient。
应用层发展阶段:
-
在应用层调用微服务API,且聚合数据后返回给C端用户;
-
业务越来越大越多,每个开发都在应用层写接口导致应用层特别冗余,所以特意提供一个公共转发接口,直接转发C端接口,所有业务处理和数据聚合都在业务微服务层中处理;
-
业务流量越来越大,在应用层通过使用同步阻塞式 I/O 模型(Servlet API)去请求微服务,出现线程阻塞,在应用层耗费时间越来越多,所以应用层通用转发接口改造成使用WebClient,去处理大量的并发请求;
本篇文章中涉及到的Reactor 和 Netty相关知识,请参考 Netty理论三:Netty线程模型 Netty系列
2 WebClient 介绍以及使用
WebClient是WebFlux框架中重要的Http请求框架。同时也是Spring官方的Http请求工具,相当于SpringMVC框架中的RestTemplate。 在日常使用中,WebClient通常是使用WebClient.Builder来完成构建的。为了方便日常使用,笔者将日常使用到的场景封装了一个工具类,见文章最后的附录。
2.1 传统阻塞IO模型 VS 响应式IO模型
- 传统阻塞IO模型 RestTemplate
Spring3.0引入了RestTemplate,SpringMVC或Struct等框架都是基于Servlet的,其底层IO模型是阻塞IO模型。采用阻塞IO模式获取输入数据。每个连接都需要独立的线程,完成数据输入、业务处理、返回。传统阻塞IO模型的问题是,当并发数很大时,就要创建大量线程,占用很大的系统资源。连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。
传统阻塞IO模型- 响应式IO模型 WebClient
Spring5中引入了WebClient作为非阻塞式Reactive Http客户端。Spring社区为了解决SpringMVC的阻塞模型在高并发场景下的性能瓶颈,推出了Spring WebFlux,WebFlux底层实现是久经考验的Netty非阻塞IO通信框架。其实WebClient处理单个HTTP请求的响应时长并不比RestTemplate更快,但是它处理并发的能力更强,非阻塞的方式可以使用较少的线程以及硬件资源来处理更多的并发。
所以响应式非阻塞IO模型的核心意义在于,提高了单位时间内有限资源下的服务请求的并发处理能力,而不是缩短了单个服务请求的响应时长。
img2.2 RestTemplate vs WebClient
与RestTemplate相比,WebClient的优势
-
非阻塞响应式IO,单位时间内有限资源下支持更高的并发量。
-
支持使用Java8 Lambda表达式函数。
-
支持同步、异步、Stream流式传输。
使用webClient在等待远程响应的同时不会阻塞本地正在执行的线程 ;本地线程处理完一个请求紧接着可以处理下一个,能够提高系统的吞吐量;而restTemplate 这种方式是阻塞的,会一直占用当前线程资源,直到http返回响应。如果等待的请求发生了堆积,应用程序将创建大量线程,直至耗尽线程池所有可用线程,甚至出现OOM。另外频繁的CPU上下文切换,也会导致性能下降。
但是作为上述两种方式的调用方(消费者)而言,其最终获得http响应结果的耗时并未减少。
使用webclient替代restTemplate的好处是可以异步等待http响应,使得线程不需要阻塞;单位时间内有限资源下支持更高的并发量。
2.3 WebClient的线程模型
图片-
如上当调用线程使用webclient发起请求后,内部会先创建一个Mono响应对象,然后切换到IO线程具体发起网络请求。
-
调用线程获取到Mono对象后,一般会订阅,也就是设置一个Consumer用来具体处理服务端响应结果。
-
服务端接受请求后,进行处理,最后把结果写回客户端,客户端接受响应后,使用IO线程把结果设置到Mono对象,从而触发设置的Consumer回调函数的执行。
WebClient默认内部使用Netty实现http客户端调用,这里IO线程其实是netty的IO线程,而netty客户端的IO线程内是不建议做耗时操作的,因为IO线程是用来轮训注册到select上的channel的数据的,如果阻塞了,那么其他channel的读写请求就会得不到及时处理。所以如果consumer内逻辑比较耗时,建议从IO线程切换到其他线程来做。
那么如何切换那?可以使用publishOn把IO线程切换到自定义线程池进行处理:
resp.publishOn(Schedulers.elastic())//切换到Schedulers.elastic()对应的线程池进行处理
.onErrorMap(throwable -> {
System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
return throwable;
})
.subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
也就是说,WebClient调用微服务返回结果后,最好是直接返回给C端用户,如果一定要对返回的结果做数据耗时操作,那么就需要线程切换到自定义线程池进行处理,避免Boss Group线程池阻塞(参考 Netty理论三:Netty线程模型)。
3 Spring WebFlux
Spring WebFlux 是Spring 5 推出的,是一个异步非阻塞式的 Web 框架,底层也是基于Netty实现的,所以,它特别适合应用在 IO 密集型的服务中,比如微服务网关这样的应用中。
IO 密集型包括:磁盘IO密集型, 网络IO密集型,微服务网关就属于网络 IO 密集型,使用异步非阻塞式编程模型,能够显著地提升网关对下游服务转发的吞吐量。
img3.1 Spring Mvc vs Spring WebFlux
Spring MVC 构建于 Servlet API 之上,使用的是同步阻塞式 I/O 模型,每一个请求对应一个线程去处理。 Spring WebFlux 是一个异步非阻塞式的 Web 框架,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。
相同点:
-
都可以使用 Spring MVC 注解,如
@Controller
, 方便我们在两个 Web 框架中自由转换; -
均可以使用 Tomcat, Jetty, Undertow Servlet 容器(Servlet 3.1+);
注意点:
-
Spring MVC 因为是使用的同步阻塞式,更方便开发人员编写功能代码,Debug 测试等,一般来说,如果 Spring MVC 能够满足的场景,就尽量不要用 WebFlux;
-
WebFlux 默认情况下使用 Netty 作为服务器,(启动时输出中包含
Netty started on port(s): 8080
语句时) -
WebFlux 不支持 MySql
WebFlux 不是 Spring MVC 的替代方案!,虽然 WebFlux 也可以被运行在 Servlet 容器上(需是 Servlet 3.1+ 以上的容器),但是 WebFlux 主要还是应用在异步非阻塞编程模型,而 Spring MVC 是同步阻塞的,如果你目前在 Spring MVC 框架中大量使用非同步方案,那么,WebFlux 才是你想要的,否则,使用 Spring MVC 才是你的首选。
WebFlux 相比于SpringMvc虽然可以极大的提升吞吐量,但是也不是没有副作用,像学习曲线高、调试难以及不支持jdbc等数据库,而NoSql数据库相对支持较为完善,而且在微服务架构中,Spring MVC 和 WebFlux 可以混合使用,比如已经提到的,对于那些 IO 密集型服务(如网关),我们就可以使用 WebFlux 来实现。开头背景中,我们就是这样混用改造的应用层的。
img3.2 WebFlux 的优势&提升性能
WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动,可以让我们在不扩充硬件资源的前提下,提升系统的吞吐量和伸缩性。 WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。
Spring WebFlux底层是通过Netty通信的,关于Netty相关知识,见 Netty系列
4 spring-cloud-Gateway vs Nginx
-
spring-cloud-Gateway: 微服务网关,实现微服务的统一路由,统一鉴权,跨域,限流等功能;
-
Nginx :高性能HTTP和反向代理的web服务器,处理高并发能力是十分强大,最高能支持5w个并发连接数;
有了Nginx 做网关,为啥还要用到spring-cloud-Gateway呢?
首先这两种网关的定义不一样
-
业务网关:spring-cloud-Gateway的定义是针对每一个业务微服务来得,属于业务网关;
对于具体的后端业务应用或者是服务和业务有一定关联性的策略网关就是下图左边的架构模型——业务网关。 业务网关针对具体的业务需要提供特定的流控策略、缓存策略、鉴权认证策略等等。
-
流量网关:Nginx针对的是用户访问的总入口,也就是前端页面的容器,属于流量网关;
与业务网关相反,定义全局性的、跟具体的后端业务应用和服务完全无关的策略网关就是下图右边所示的架构模型——流量网关。流量网关通常只专注于全局的Api管理策略,比如全局流量监控、日志记录、全局限流、黑白名单控制、接入请求到业务系统的负载均衡等,有点类似防火墙。Kong 就是典型的流量网关。公司内通常会提供统一的公共网关;
这里需要补充一点的是,业务网关一般部署在流量网关之后、业务系统之前,比流量网关更靠近业务系统。通常API网指的是业务网关。 有时候我们也会模糊流量网关和业务网关,让一个网关承担所有的工作,所以这两者之间并没有严格的界线。
Nginx与spring-cloud-Gateway的区别:
-
Nginx 是用户到前端工程的网关,对外网关;是用C语言写的,自定义扩展的话,要么写C要么写lua;
-
spring-cloud-Gateway是微服务网关,是前端工程到后台服务之间的一个对内网关;是java语言的一个框架,可以在框架上进行代码的扩展与控制,例如:安全控制,统一异常处理,XXS,SQL注入等;权限控制,黑白名单,性能监控,日志打印等;
-
spring-cloud-Gateway 的主要功能有,路由,断言,过滤器,利用它的这些特性,可以做流控;、
-
Nginx 做网关,更多的是做总流量入口,反向代理,负载均衡等,还可以用来做web服务器。
比如,在架构中,部署在阿里云上的访问接入层SLB。内部是一个LVS+Nginx实现的四层+七层负载均衡(作用是 反向代理 负载均衡);
而在应用网关中使用SpringGateway/Zuul做统一鉴权;
spring-cloud-Gateway架构图:
image.pngspring-cloud-Gateway请求处理模型
Gateway请求处理模型SpringCloud Gateway是基于WebFlux框架实现的,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。
可以看到NettyServer的Boss Group线程池内的线程循环接收这个请求,然后把完成了TCP三次握手的连接channel交给Worker Group中的某一个事件循环线程来进行处理(该事件处理线程会调用对应的controller进行处理)。
所以WebFlux的handler执行是使用Netty的IO线程进行执行的,所以需要注意如果handler的执行比较耗时,会把IO线程耗尽导致不能再处理其他请求,可以通过Reactor的publishOn操作符切换到其他线程池中执行。
5 spring-cloud-Gateway vs Zuul
Zuul:
-
使用的是阻塞式的 API,不支持长连接,比如 websockets。
-
底层是servlet,Zuul处理的是http请求
-
没有提供异步支持,流控等均由hystrix支持。
-
依赖包spring-cloud-starter-netflix-zuul。
spring-cloud-Gateway:
-
Spring Boot和Spring Webflux提供的Netty底层环境,不能和传统的Servlet容器一起使用,也不能打包成一个WAR包。
-
依赖spring-boot-starter-webflux和/ spring-cloud-starter-gateway
-
提供了异步支持,提供了抽象负载均衡,提供了抽象流控,并默认实现了RedisRateLimiter。
5.1 相同点:
-
底层都是servlet;
-
两者均是web网关,处理的是http请求
5.2 不同点:
1、内部实现:
spring-cloud-Gateway对比zuul多依赖了spring-webflux,在spring的支持下,功能更强大,内部实现了限流、负载均衡等,扩展性也更强,但同时也限制了仅适合于Spring Cloud套件 zuul则可以扩展至其他微服务框架中,其内部没有实现限流、负载均衡等。 2、是否支持异步 zuul仅支持同步 spring-cloud-Gateway支持异步。理论上spring-cloud-Gateway则更适合于提高系统吞吐量(但不一定能有更好的性能),最终性能还需要通过严密的压测来决定 3、框架设计的角度 spring-cloud-Gateway具有更好的扩展性,并且其已经发布了2.0.0的RELESE版本,稳定性也是非常好的 4、性能 WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。使用非阻塞API。 Websockets得到支持,并且由于它与Spring紧密集成,所以将会是一个更好的 开发 体验。 Zuul 1.x,是一个基于阻塞io的API Gateway。Zuul已经发布了Zuul 2.x,基于Netty,也是非阻塞的,支持长连接,但Spring Cloud暂时还没有整合计划。
5.3 总结
总的来说,在微服务架构,如果使用了Spring Cloud生态的基础组件,则Spring Cloud Gateway相比而言更加具备优势,单从流式编程+支持异步上就足以让开发者选择它了。 对于小型微服务架构或是复杂架构(不仅包括微服务应用还有其他非Spring Cloud服务节点),zuul也是一个不错的选择。
网关的更多知识见 微服务架构下网关的技术选型
附录:
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 网络请求工具类
*
* @author admin
*/
public class WebClientHelper {
private static final Integer DEFAULT_CONNECT_TIMEOUT = 3000;
private static final Integer DEFAULT_REQUEST_TIMEOUT = 10000;
/**
* get请求解析成字符串
*
* @param url url
* @return java.lang.String
* @author admin admin
* @since 2019/10/30
*/
public static ClientResponse getResponse(String url) {
Mono<ClientResponse> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(url)
.exchange();
return resp.block();
}
/**
* get请求,解析成对象
*
* @param url url
* @param tClass class
* @param headers 请求头
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T get(String url, Class<T> tClass, Map<String, String> headers) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(url)
.headers(t -> t.setAll(headers))
.retrieve()
.bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* get请求,解析成对象
*
* @param url url
* @param headers 请求头
* @return T
* @author admin
* @since 2019/10/30
*/
public static String get(String url, Map<String, String> headers) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(url)
.headers(t -> t.setAll(headers))
.retrieve()
.bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* get请求,解析成对象
*
* @param scheme 协议 http/https
* @param host host
* @param obj query params
* @param headers 请求头
* @return T
* @author admin
* @since 2019/10/30
*/
public static String get(String scheme, String host, String path, Object obj, Map<String, String> headers) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(uriBuilder -> uriBuilder.scheme(scheme).host(host).path(path).queryParams(getRequestParamMapByObj(obj)).build())
.headers(t -> t.setAll(headers))
.retrieve()
.bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* get请求,解析成对象
*
* @param url url
* @param tClass class
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T get(String url, Object obj, Class<T> tClass) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(uriBuilder -> uriBuilder.path(url).queryParams(getRequestParamMapByObj(obj)).build())
.retrieve()
.bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* get请求,解析成对象
*
* @param url url
* @param tClass class
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T get(String url, Class<T> tClass) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(url)
.retrieve()
.bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* get请求解析成字符串
*
* @param url url
* @return java.lang.String
* @author admin
* @since 2019/10/30
*/
public static String get(String url) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts()
.get()
.uri(url)
.retrieve()
.bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post表单请求返回对象
*
* @param url url
* @param params 请求参数
* @param tClass 返回对象
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T post(String url, Map<String, String> params, Class<T> tClass) {
MultiValueMap<String, String> formData = getRequestParamMap(params);
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(formData))
.retrieve().bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post表单请求返回字符串
*
* @param url url
* @param params 请求参数
* @return java.lang.String
* @author admin
* @since 2019/10/30
*/
public static String post(String url, Map<String, String> params) {
MultiValueMap<String, String> formData = getRequestParamMap(params);
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(formData))
.retrieve().bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post json请求结果解析成对象
*
* @param url url
* @param jsonBody 请求body,可以是对象或者是map
* @param tClass 解析对象
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T postJson(String url, Object jsonBody, Class<T> tClass) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(jsonBody), Object.class)
.retrieve().bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post json请求结果解析成对象
*
* @param url url
* @param jsonBody 请求body,可以是对象或者是map
* @param tClass 解析对象
* @return T
* @author admin
* @since 2019/10/30
*/
public static <T> T postJson(String url, Map<String, String> headers, Object jsonBody, Class<T> tClass) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.headers(t -> t.setAll(headers))
.body(Mono.just(jsonBody), Object.class)
.retrieve().bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post json请求结果解析成字符串
*
* @param url url
* @param jsonBody 请求body,可以是对象或者是map
* @return java.lang.String
* @author admin
* @since 2019/10/30
*/
public static String postJson(String url, Object jsonBody) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(jsonBody), Object.class)
.retrieve().bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
/**
* post json请求结果解析成字符串
*
* @param url url
* @param jsonBody 请求body,可以是对象或者是map
* @return java.lang.String
* @author admin
* @since 2019/10/30
*/
public static String postJson(String url, Map<String, String> headers, Object jsonBody) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.headers(t -> t.setAll(headers))
.body(Mono.just(jsonBody), Object.class)
.retrieve().bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
public static <T> T postRawJson(String url, String jsonBody, Class<T> tClass) {
Mono<T> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(jsonBody))
.retrieve().bodyToMono(tClass).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
public static String postRawJson(String url, String jsonBody) {
Mono<String> resp = createWebClientWithConnectAndReadTimeOuts().post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(jsonBody))
.retrieve().bodyToMono(String.class).timeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT));
return resp.block();
}
private static WebClient createWebClientWithConnectAndReadTimeOuts() {
// create reactor netty HTTP client
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(tcpClient -> {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, DEFAULT_CONNECT_TIMEOUT);
tcpClient = tcpClient.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)));
return tcpClient;
});
// create a client http connector using above http client
ClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
// use this configured http connector to build the web client
return WebClient.builder().clientConnector(connector).build();
}
private static MultiValueMap<String, String> getRequestParamMap(Map<String, String> params) {
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
for (Map.Entry<String, String> entry : params.entrySet()) {
queryParams.add(entry.getKey(), entry.getValue());
}
return queryParams;
}
private static MultiValueMap<String, String> getRequestParamMapByObj(Object obj) {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.convertValue(obj, new TypeReference<Map<String, Object>>() {
});
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (Objects.isNull(entry.getValue())) {
continue;
}
queryParams.add(entry.getKey(), String.valueOf(entry.getValue()));
}
return queryParams;
}
}
网友评论