前言
上一篇文章中阐述了zuul的基本架构组成,并且将核心关键类相应做了标注以及分析,但是并未详细深入到关键性的细节,本篇文章主要是是探索zuul超时重试,服务降级的机制。
重试/服务降级机制
很多时候,当一个请求被转发至tomcat服务器处理的过程中,极有可能因为某种原因(比如服务器连接池爆满,比如sql查询太久等等)被卡主,在没有超时重试/服务降级的情况下,此时客户端完全不知情,一直处于等待状态。
重试
指当服务调用方发起请求超过XXXms后,请求还未处理完,则服务调用方会抛出异常,切断请求并进行重试。
比如向目标服务发起请求,不幸的是,由于正巧存在网络波动以至于请求超时过后依旧无法访问到目标服务,或者目标服务返回的结果无法被正确的收到,但是此时目标服务并非是不可服务的状态,所以通过少量重试可以减少由于网络波动等因素所带来的影响。
服务降级
指当服务调用方发起请求超过XXXms后,依旧无法收到正确的响应,则切断请求,接口降级,返回可接受的数据。
当在多次重试后依旧无果,客户端判断此时目标服务不可用(也许目标服务此时并非不可用),但是客户端已经提前预料到存在这样一个问题,与调用方约定服务不可用时将降级为另外接口,以返回特定的数据。
熔断降级机制在广大互联网公司是非常常见的,且在SOA服务,微服务等架构盛行的今天,面对复杂的业务设计,海量的大数据,服务降级策略越发的重要。
目前服务降级的策略也非常多,比如nginx,hystrix……
zuul 1.x的线程模型
想要了解zuul的重试/降级等机制的前提下,有必要优先了解zuul的线程模型。
源自网络.png从上图可以非常清晰的看出zuul1.x的线程模型,即每一个请求都会以阻塞方式调用处理(经由RibbonRoutingFilter
处理的方式)
查看HystrixCommand#queue()
源码可以看到如下代码的注释
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
RibbonRoutingFilter转发机制详解
RibbonRoutingFilter#forward
通过debug方式可以看到ribbonCommandFactory
其实是HttpClientRibbonCommandFactory
实例,并用以创建HttpClientRibbonCommand
实例。根据前文看到的zuul的线程模型,可以断定command.execute()
的调用肯定是HttpClientRibbonCommand#run()
的方法
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
// HttpClientRibbonCommandFactory#create
// HttpClientRibbonCommand
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
// HttpClientRibbonCommand#run
ClientHttpResponse response = command.execute();// queue().get()
this.helper.appendDebug(info, response.getStatusCode().value(),
response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
HttpClientRibbonCommandFactory#create
在创建HttpClientRibbonCommand
之时,也会寻找是否存在相应的降级接口(自定义实现),如果ZuulFallbackProvider
如果为空则降级后按照调用HystrixCommand#getFallback()
抛出异常UnsupportedOperationException("No fallback available.")
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
// ZuulFallbackProvider降级接口,每个serviceId对应一个
// Hystrix 熔断时会调用该接口
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
final String serviceId = context.getServiceId();
// 成功开启重试后的值为RetryableRibbonLoadBalancingHttpClient
// 非成功开启重试为RibbonLoadBalancingHttpClient
final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}
内部如何决策开启重试机制呢?
从创建bean的条件看,归根结底是根据是否引入srping-retry
来决定是否创建重试实例
@Configuration
@ConditionalOnClass(name = "org.apache.http.client.HttpClient")
@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
public class HttpClientRibbonConfiguration {
@RibbonClientName
private String name = "client";
// ....
@Bean
@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
// 创建bean的条件是org.springframework.retry.support.RetryTemplate不存在
@ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")
public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
IClientConfig config, ServerIntrospector serverIntrospector,
ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) {
RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector);
client.setLoadBalancer(loadBalancer);
client.setRetryHandler(retryHandler);
Monitors.registerObject("Client_" + this.name, client);
return client;
}
@Bean
@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
// 创建bean的条件是org.springframework.retry.support.RetryTemplate存在
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
IClientConfig config, ServerIntrospector serverIntrospector,
ILoadBalancer loadBalancer, RetryHandler retryHandler,
LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {
RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
httpClient, config, serverIntrospector, loadBalancedRetryFactory);
client.setLoadBalancer(loadBalancer);
client.setRetryHandler(retryHandler);
Monitors.registerObject("Client_" + this.name, client);
return client;
}
}
image.png
HttpClientRibbonCommand#AbstractRibbonCommand#run
前文提到,执行command.execute
的时候会执行HttpClientRibbonCommand#run
,但是由于HttpClientRibbonCommand
没有找到run
方法,所以前往父类AbstractRibbonCommand
寻找run
方法
final RequestContext context = RequestContext.getCurrentContext();
RQ request = createRequest();
// RibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
// RetryableRibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
RS response = this.client.executeWithLoadBalancer(request, config);
context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to
// release the underlying HTTP connection held by the response.
//
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
return new RibbonHttpResponse(response);
}
AbstractLoadBalancerAwareClient#
这里涉及到Observable相当多的API, 基于RxJava框架,相关的知识可以前往官网或者其他博文了解,这里不做多余赘述。
关键代码在于AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
究竟做了什么事?
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 请求重试处理器
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build();
try {
// 将请求执行包装在Observable
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
RibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler
RetryableRibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler
查看如下源码发现
okToRetryOnConnectErrors
,okToRetryOnAllErrors
都被初始化为false
fallback
被初始化为DefaultLoadBalancerRetryHandler
@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
}
public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
Preconditions.checkNotNull(baseRetryHandler);
this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
this.okToRetryOnAllErrors = okToRetryOnAllErrors;
this.fallback = baseRetryHandler;
if (requestConfig != null) {
if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries);
}
if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer);
}
}
}
LoadBalancerCommand#submit
该方法代码量较多,且多数为Observable代码,截取其中关键信息查看
// 同一个服务地址最大重试次数,且根据创建条件, 该值走到
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 整个集群内部同一个服务的多个实例的最大重试次数
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// 从创建RequestSpecificRetryHandler的条件看,maxRetrysSame 与 maxRetrysNext 都是0,
// 也就说下边的重试条件永远不可能发生,详细请查阅DefaultLoadBalancerRetryHandler源码
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
// 重试策略,也可以称之为判定是否重试
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
return new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer tryCount, Throwable e) {
if (e instanceof AbortExecutionException) {
return false;
}
if (tryCount > maxRetrys) {
return false;
}
if (e.getCause() != null && e instanceof RuntimeException) {
e = e.getCause();
}
return retryHandler.isRetriableException(e, same);
}
};
}
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
if (okToRetryOnAllErrors) {
// 查看刚刚的源码发现,不管是否重试,这里的值都被设置为false,所以这里不可能返回
return true;
}
else if (e instanceof ClientException) {
// 如果是客户端异常信息
ClientException ce = (ClientException) e;
// 客户端限流
if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
// sameServer是指是否都是同一个sever
// 一旦某一个server实例跑了异常,则不再对该服务进行重试
// 不同实例地址则重试
return !sameServer;
} else {
// 否则不再重试
return false;
}
}
else {
// 必false
return okToRetryOnConnectErrors && isConnectionException(e);
}
}
从源码上看,咋看以为重试的策略是主动去触发Observable#retry
重试机制进行重试,但是通过bebug的方式却发现太天真了。因为在通过getRequestSpecificRetryHandler
方法创建的RequestSpecificRetryHandler
都是一样的,内部的RetryHandler
都是默认构造的DefaultLoadBalancerRetryHandler
,所以retrySameServer
与retryNextServer
都是0,也就说通过触发Observable#retry
的机制至少在这个版本是不会发生的。
那么重试的机制明显就交给了spring-retry
来处理,那么具体的处理方式又定义在何处呢?
RetryableRibbonLoadBalancingHttpClient#execute
@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
//
final RequestConfig.Builder builder = RequestConfig.custom();
IClientConfig config = configOverride != null ? configOverride : this.config;
builder.setConnectTimeout(config.get(
CommonClientConfigKey.ConnectTimeout, this.connectTimeout));// 默认2s
builder.setSocketTimeout(config.get(
CommonClientConfigKey.ReadTimeout, this.readTimeout)); // 默认5s
builder.setRedirectsEnabled(config.get(
CommonClientConfigKey.FollowRedirects, this.followRedirects));
final RequestConfig requestConfig = builder.build();
return this.executeWithRetry(request, new RetryCallback() {
// ....
});
}
private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception {
LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);//RibbonLoadBalancedRetryPolicyFactory
RetryTemplate retryTemplate = new RetryTemplate();
boolean retryable = request.getContext() == null ? true :
BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true);
retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
: new RetryPolicy(request, retryPolicy, this, this.getClientName()));//RetryableRibbonLoadBalancingHttpClient
return retryTemplate.execute(callback);
}
@Override
public LoadBalancedRetryPolicy create(final String serviceId, final ServiceInstanceChooser loadBalanceChooser) {
final RibbonLoadBalancerContext lbContext = this.clientFactory
.getLoadBalancerContext(serviceId);
return new LoadBalancedRetryPolicy() { // 由于这里是匿名实例,所以可能会比较难找
// 用以判断是否重试相同的服务实例
@Override
public boolean canRetrySameServer(LoadBalancedRetryContext context) {
return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context);
}
// 用以判断是否重试集群内下一个服务实例
@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
//this will be called after a failure occurs and we increment the counter
//so we check that the count is less than or equals to too make sure
//we try the next server the right number of times
return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
}
@Override
public void close(LoadBalancedRetryContext context) {
}
@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
//Check if we need to ask the load balancer for a new server.
//Do this before we increment the counters because the first call to this method
//is not a retry it is just an initial failure.
if(!canRetrySameServer(context) && canRetryNextServer(context)) {
context.setServiceInstance(loadBalanceChooser.choose(serviceId));
}
//This method is called regardless of whether we are retrying or making the first request.
//Since we do not count the initial request in the retry count we don't reset the counter
//until we actually equal the same server count limit. This will allow us to make the initial
//request plus the right number of retries.
if(sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
//reset same server since we are moving to a new server
sameServerCount = 0;
nextServerCount++;
if(!canRetryNextServer(context)) {
context.setExhaustedOnly();
}
} else {
sameServerCount++;
}
}
};
}
以上基本上把zuul的一次请求(包括开启重试功能以及不开启重试功能)的全部过程都了解了一遍,讲道理应该对zuul的请求转发有了比较深刻的了解。
总结
请求流至RibbonRoutingFilter
之后,决定是否重试的功能点在于是否引入了spring-retry
包,能否找到org.springframework.retry.support.RetryTemplate
这个全限定类名。如果找到则顺利开启重试机制,否则不开启重启机制。
除此之外,由于RibbonCommand
继承了HystrixExecutable
,理论上具备了熔断降级策略
的,测试是否具备熔断降级策略
,可以继承自ZuulFallbackProvider
,并将实现类加入到spring容器中(@Component
)。
从源码分析的角度来看,熔断降级策略
与spring-retry
并没有产生直接的关系,也就说当请求发起重试的时候,即便已经被降级了之后,后端却还是重试,并且在重试过程中,在发生降级之后,后边所有的重试其实都是无意义的重试,因为不管重试是否成功,最后的返回值都是降级后的接口返回的数据。
经过测试发现,熔断降级策略
默认是1s降级,而超时重试默认为5s(请查看前文源码注释)。
调试实战
- 在zuul应用的加入
spring-retry
依赖
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
- zuul的配置文件加入
zuul.retryable=true
,ribbon.MaxAutoRetries=3
,ribbon.MaxAutoRetriesNextServer=2
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
server:
port: 8769
spring:
application:
name: service-zuul
zuul:
routes:
api-a:
path: /api-a/**
serviceId: service-ribbon
retryable: true
ribbon:
MaxAutoRetries: 3
MaxAutoRetriesNextServer: 2
创建熔断后降级接口
@Component
public class MyZuulFallbackProvider implements ZuulFallbackProvider {
@Override
public String getRoute() {
return "service-ribbon";
}
@Override
public ClientHttpResponse fallbackResponse() {
return new ClientHttpResponse() {
@Override
public InputStream getBody() throws IOException {
Map<String, Object> map = new HashMap<String, Object>();
map.put("code", 1);
map.put("text", "error");
final byte[] reqBodyBytes = map.toString().getBytes();
return new ServletInputStreamWrapper(reqBodyBytes);
}
@Override
public HttpHeaders getHeaders() {
return new HttpHeaders();
}
@Override
public HttpStatus getStatusCode() throws IOException {
// TODO Auto-generated method stub
return HttpStatus.OK;
}
@Override
public int getRawStatusCode() throws IOException {
return 0;
}
@Override
public String getStatusText() throws IOException {
return "201 error";
}
@Override
public void close() {
}
};
}
}
在服务被调用方中加入一个count来计算重试的次数(count
值只用一次,做简单验证足以)
@RestController
public class HelloControler {
private Integer count = 4;
@Autowired
HelloService helloService;
@RequestMapping(value = "/hi")
public String hi(@RequestParam String name){
if( 0 == count --) {
// 当尝试第4次请求时,直接返回。
return "hi has bean hystrix";
}
System.out.println("request is coming...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("线程被打断... " + e.getMessage());
}
return name;
}
}
总结
阅读源码是一件让人兴奋愉悦,却有苦不堪言的事,但是坚持下来就好,本来想画一下整个调用过程的相关类图,可是有点懒,就不画了……
老外写的代码感觉更难以看懂一些,不过还好,基本的设计模式没问题,配合编辑以看起来也就不是很累了。
zuul的源码阅读估计就到这里了,其他的坑等后续遇见了再学习。不过重试与降级的问题(降级后继续重试的问题),简直不能忍,是不是这个问题会在zuul2.x版本中解决呢?
网友评论