美文网首页
Zuul 1.x 重试策略(源码分析)

Zuul 1.x 重试策略(源码分析)

作者: 丑人林宗己 | 来源:发表于2018-07-08 21:18 被阅读125次

前言

上一篇文章中阐述了zuul的基本架构组成,并且将核心关键类相应做了标注以及分析,但是并未详细深入到关键性的细节,本篇文章主要是是探索zuul超时重试,服务降级的机制。

重试/服务降级机制

很多时候,当一个请求被转发至tomcat服务器处理的过程中,极有可能因为某种原因(比如服务器连接池爆满,比如sql查询太久等等)被卡主,在没有超时重试/服务降级的情况下,此时客户端完全不知情,一直处于等待状态。

重试

指当服务调用方发起请求超过XXXms后,请求还未处理完,则服务调用方会抛出异常,切断请求并进行重试。

比如向目标服务发起请求,不幸的是,由于正巧存在网络波动以至于请求超时过后依旧无法访问到目标服务,或者目标服务返回的结果无法被正确的收到,但是此时目标服务并非是不可服务的状态,所以通过少量重试可以减少由于网络波动等因素所带来的影响。

服务降级

指当服务调用方发起请求超过XXXms后,依旧无法收到正确的响应,则切断请求,接口降级,返回可接受的数据。

当在多次重试后依旧无果,客户端判断此时目标服务不可用(也许目标服务此时并非不可用),但是客户端已经提前预料到存在这样一个问题,与调用方约定服务不可用时将降级为另外接口,以返回特定的数据。

熔断降级机制在广大互联网公司是非常常见的,且在SOA服务,微服务等架构盛行的今天,面对复杂的业务设计,海量的大数据,服务降级策略越发的重要。

目前服务降级的策略也非常多,比如nginx,hystrix……

zuul 1.x的线程模型

想要了解zuul的重试/降级等机制的前提下,有必要优先了解zuul的线程模型。

源自网络.png

从上图可以非常清晰的看出zuul1.x的线程模型,即每一个请求都会以阻塞方式调用处理(经由RibbonRoutingFilter处理的方式)

查看HystrixCommand#queue()源码可以看到如下代码的注释

 /*
 * The Future returned by Observable.toBlocking().toFuture() does not implement the
 * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
 * thus, to comply with the contract of Future, we must wrap around it.
 */
final Future<R> delegate = toObservable().toBlocking().toFuture();

RibbonRoutingFilter转发机制详解

RibbonRoutingFilter#forward

通过debug方式可以看到ribbonCommandFactory其实是HttpClientRibbonCommandFactory实例,并用以创建HttpClientRibbonCommand实例。根据前文看到的zuul的线程模型,可以断定command.execute()的调用肯定是HttpClientRibbonCommand#run()的方法

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
    Map<String, Object> info = this.helper.debug(context.getMethod(),
            context.getUri(), context.getHeaders(), context.getParams(),
            context.getRequestEntity());

    // HttpClientRibbonCommandFactory#create
    // HttpClientRibbonCommand
    RibbonCommand command = this.ribbonCommandFactory.create(context);
    try {
        // HttpClientRibbonCommand#run
        ClientHttpResponse response = command.execute();// queue().get()
        this.helper.appendDebug(info, response.getStatusCode().value(),
                response.getHeaders());
        return response;
    }
    catch (HystrixRuntimeException ex) {
        return handleException(info, ex);
    }
}

HttpClientRibbonCommandFactory#create

在创建HttpClientRibbonCommand之时,也会寻找是否存在相应的降级接口(自定义实现),如果ZuulFallbackProvider如果为空则降级后按照调用HystrixCommand#getFallback()抛出异常UnsupportedOperationException("No fallback available.")

@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
    // ZuulFallbackProvider降级接口,每个serviceId对应一个
    // Hystrix 熔断时会调用该接口
    ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
    final String serviceId = context.getServiceId();
    
    // 成功开启重试后的值为RetryableRibbonLoadBalancingHttpClient
    // 非成功开启重试为RibbonLoadBalancingHttpClient
    final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
            serviceId, RibbonLoadBalancingHttpClient.class);
    client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));

    return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
            clientFactory.getClientConfig(serviceId));
}

内部如何决策开启重试机制呢?

从创建bean的条件看,归根结底是根据是否引入srping-retry来决定是否创建重试实例

@Configuration
@ConditionalOnClass(name = "org.apache.http.client.HttpClient")
@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
public class HttpClientRibbonConfiguration {
    @RibbonClientName
    private String name = "client";

    // ....

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    // 创建bean的条件是org.springframework.retry.support.RetryTemplate不存在
    @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")
    public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
        IClientConfig config, ServerIntrospector serverIntrospector,
        ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) {
        RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    // 创建bean的条件是org.springframework.retry.support.RetryTemplate存在
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
            IClientConfig config, ServerIntrospector serverIntrospector,
            ILoadBalancer loadBalancer, RetryHandler retryHandler,
            LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {
        RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
            httpClient, config, serverIntrospector, loadBalancedRetryFactory);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }
}
image.png

HttpClientRibbonCommand#AbstractRibbonCommand#run

前文提到,执行command.execute的时候会执行HttpClientRibbonCommand#run,但是由于HttpClientRibbonCommand没有找到run方法,所以前往父类AbstractRibbonCommand寻找run方法

final RequestContext context = RequestContext.getCurrentContext();

    RQ request = createRequest();
    // RibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
    // RetryableRibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
    RS response = this.client.executeWithLoadBalancer(request, config);

    context.set("ribbonResponse", response);

    // Explicitly close the HttpResponse if the Hystrix command timed out to
    // release the underlying HTTP connection held by the response.
    //
    if (this.isResponseTimedOut()) {
        if (response != null) {
            response.close();
        }
    }

    return new RibbonHttpResponse(response);
}

AbstractLoadBalancerAwareClient#

这里涉及到Observable相当多的API, 基于RxJava框架,相关的知识可以前往官网或者其他博文了解,这里不做多余赘述。

关键代码在于AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)究竟做了什么事?

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    // 请求重试处理器
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
    LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
            .withLoadBalancerContext(this)
            .withRetryHandler(handler)
            .withLoadBalancerURI(request.getUri())
            .build();

    try {
        // 将请求执行包装在Observable
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
}

RibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler
RetryableRibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler

查看如下源码发现
okToRetryOnConnectErrors,okToRetryOnAllErrors都被初始化为false
fallback被初始化为DefaultLoadBalancerRetryHandler

@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
    return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
}

public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
    Preconditions.checkNotNull(baseRetryHandler);
    this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
    this.okToRetryOnAllErrors = okToRetryOnAllErrors;
    this.fallback = baseRetryHandler;
    if (requestConfig != null) {
        if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
            retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries); 
        }
        if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
            retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); 
        } 
    }
}

LoadBalancerCommand#submit

该方法代码量较多,且多数为Observable代码,截取其中关键信息查看

// 同一个服务地址最大重试次数,且根据创建条件, 该值走到
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 整个集群内部同一个服务的多个实例的最大重试次数
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// 从创建RequestSpecificRetryHandler的条件看,maxRetrysSame 与 maxRetrysNext 都是0,
// 也就说下边的重试条件永远不可能发生,详细请查阅DefaultLoadBalancerRetryHandler源码

if (maxRetrysSame > 0) 
     o = o.retry(retryPolicy(maxRetrysSame, true));
if (maxRetrysNext > 0 && server == null) 
    o = o.retry(retryPolicy(maxRetrysNext, false));

// 重试策略,也可以称之为判定是否重试
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
    return new Func2<Integer, Throwable, Boolean>() {
        @Override
        public Boolean call(Integer tryCount, Throwable e) {
            if (e instanceof AbortExecutionException) {
                return false;
            }

            if (tryCount > maxRetrys) {
                return false;
            }
            
            if (e.getCause() != null && e instanceof RuntimeException) {
                e = e.getCause();
            }
            
            return retryHandler.isRetriableException(e, same);
        }
    };
}

@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
    if (okToRetryOnAllErrors) {
        // 查看刚刚的源码发现,不管是否重试,这里的值都被设置为false,所以这里不可能返回
        return true;
    } 
    else if (e instanceof ClientException) {
        // 如果是客户端异常信息
        ClientException ce = (ClientException) e;
        // 客户端限流
        if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
            // sameServer是指是否都是同一个sever
            // 一旦某一个server实例跑了异常,则不再对该服务进行重试
            // 不同实例地址则重试
            return !sameServer;
        } else {
            // 否则不再重试
            return false;
        }
    } 
    else  {
        // 必false
        return okToRetryOnConnectErrors && isConnectionException(e);
    }
}

从源码上看,咋看以为重试的策略是主动去触发Observable#retry重试机制进行重试,但是通过bebug的方式却发现太天真了。因为在通过getRequestSpecificRetryHandler方法创建的RequestSpecificRetryHandler都是一样的,内部的RetryHandler都是默认构造的DefaultLoadBalancerRetryHandler,所以retrySameServerretryNextServer都是0,也就说通过触发Observable#retry的机制至少在这个版本是不会发生的。

那么重试的机制明显就交给了spring-retry来处理,那么具体的处理方式又定义在何处呢?

RetryableRibbonLoadBalancingHttpClient#execute

@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
    //
    final RequestConfig.Builder builder = RequestConfig.custom();
    IClientConfig config = configOverride != null ? configOverride : this.config;
    builder.setConnectTimeout(config.get(
            CommonClientConfigKey.ConnectTimeout, this.connectTimeout));// 默认2s
    builder.setSocketTimeout(config.get(
            CommonClientConfigKey.ReadTimeout, this.readTimeout)); // 默认5s
    builder.setRedirectsEnabled(config.get(
            CommonClientConfigKey.FollowRedirects, this.followRedirects));

    final RequestConfig requestConfig = builder.build();
    return this.executeWithRetry(request, new RetryCallback() {
        // ....
    });
}

private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception {
    LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);//RibbonLoadBalancedRetryPolicyFactory
    RetryTemplate retryTemplate = new RetryTemplate();
    boolean retryable = request.getContext() == null ? true :
            BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true);
    retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
            : new RetryPolicy(request, retryPolicy, this, this.getClientName()));//RetryableRibbonLoadBalancingHttpClient
    return retryTemplate.execute(callback);
}

@Override
public LoadBalancedRetryPolicy create(final String serviceId, final ServiceInstanceChooser loadBalanceChooser) {
    final RibbonLoadBalancerContext lbContext = this.clientFactory
            .getLoadBalancerContext(serviceId);
    return new LoadBalancedRetryPolicy() { // 由于这里是匿名实例,所以可能会比较难找

        // 用以判断是否重试相同的服务实例
        @Override
        public boolean canRetrySameServer(LoadBalancedRetryContext context) {
            return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context);
        }

        // 用以判断是否重试集群内下一个服务实例
        @Override
        public boolean canRetryNextServer(LoadBalancedRetryContext context) {
            //this will be called after a failure occurs and we increment the counter
            //so we check that the count is less than or equals to too make sure
            //we try the next server the right number of times
            return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
        }

        @Override
        public void close(LoadBalancedRetryContext context) {

        }

        @Override
        public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
            //Check if we need to ask the load balancer for a new server.
            //Do this before we increment the counters because the first call to this method
            //is not a retry it is just an initial failure.
            if(!canRetrySameServer(context)  && canRetryNextServer(context)) {
                context.setServiceInstance(loadBalanceChooser.choose(serviceId));
            }
            //This method is called regardless of whether we are retrying or making the first request.
            //Since we do not count the initial request in the retry count we don't reset the counter
            //until we actually equal the same server count limit.  This will allow us to make the initial
            //request plus the right number of retries.
            if(sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
                //reset same server since we are moving to a new server
                sameServerCount = 0;
                nextServerCount++;
                if(!canRetryNextServer(context)) {
                    context.setExhaustedOnly();
                }
            } else {
                sameServerCount++;
            }

        }
    };
}

以上基本上把zuul的一次请求(包括开启重试功能以及不开启重试功能)的全部过程都了解了一遍,讲道理应该对zuul的请求转发有了比较深刻的了解。

总结

请求流至RibbonRoutingFilter之后,决定是否重试的功能点在于是否引入了spring-retry包,能否找到org.springframework.retry.support.RetryTemplate这个全限定类名。如果找到则顺利开启重试机制,否则不开启重启机制。

除此之外,由于RibbonCommand继承了HystrixExecutable,理论上具备了熔断降级策略的,测试是否具备熔断降级策略,可以继承自ZuulFallbackProvider,并将实现类加入到spring容器中(@Component)。

从源码分析的角度来看,熔断降级策略spring-retry并没有产生直接的关系,也就说当请求发起重试的时候,即便已经被降级了之后,后端却还是重试,并且在重试过程中,在发生降级之后,后边所有的重试其实都是无意义的重试,因为不管重试是否成功,最后的返回值都是降级后的接口返回的数据。

经过测试发现,熔断降级策略默认是1s降级,而超时重试默认为5s(请查看前文源码注释)。

调试实战

  • 在zuul应用的加入spring-retry依赖
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.1.2.RELEASE</version>
</dependency>
  • zuul的配置文件加入zuul.retryable=trueribbon.MaxAutoRetries=3ribbon.MaxAutoRetriesNextServer=2
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8769
spring:
  application:
    name: service-zuul
zuul:
  routes:
    api-a:
      path: /api-a/**
      serviceId: service-ribbon
  retryable: true
  
  
ribbon:
  MaxAutoRetries: 3
  MaxAutoRetriesNextServer: 2

创建熔断后降级接口

@Component
public class MyZuulFallbackProvider implements ZuulFallbackProvider {

    @Override
    public String getRoute() {
        return "service-ribbon";
    }

    @Override
    public ClientHttpResponse fallbackResponse() {
        return new ClientHttpResponse() {

            @Override
            public InputStream getBody() throws IOException {
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("code", 1);
                map.put("text", "error");
                final byte[] reqBodyBytes = map.toString().getBytes();
                return new ServletInputStreamWrapper(reqBodyBytes);
            }

            @Override
            public HttpHeaders getHeaders() {
                return new HttpHeaders();
            }

            @Override
            public HttpStatus getStatusCode() throws IOException {
                // TODO Auto-generated method stub
                return HttpStatus.OK;
            }

            @Override
            public int getRawStatusCode() throws IOException {
                return 0;
            }

            @Override
            public String getStatusText() throws IOException {
                return "201 error";
            }

            @Override
            public void close() {

            }

        };
    }
}

在服务被调用方中加入一个count来计算重试的次数(count值只用一次,做简单验证足以)

@RestController
public class HelloControler {
    
    private Integer count = 4;
    
    @Autowired
    HelloService helloService;
    
    @RequestMapping(value = "/hi")
    public String hi(@RequestParam String name){
        if( 0 == count --) {
            // 当尝试第4次请求时,直接返回。
            return "hi has bean hystrix";
        }
        System.out.println("request is coming...");
           try {
              Thread.sleep(10000);
           } catch (InterruptedException e) {
              System.out.println("线程被打断... " + e.getMessage());
           }
        return name;
    }
}

总结

阅读源码是一件让人兴奋愉悦,却有苦不堪言的事,但是坚持下来就好,本来想画一下整个调用过程的相关类图,可是有点懒,就不画了……

老外写的代码感觉更难以看懂一些,不过还好,基本的设计模式没问题,配合编辑以看起来也就不是很累了。

zuul的源码阅读估计就到这里了,其他的坑等后续遇见了再学习。不过重试与降级的问题(降级后继续重试的问题),简直不能忍,是不是这个问题会在zuul2.x版本中解决呢?

相关文章

  • Zuul 1.x 重试策略(源码分析)

    前言 上一篇文章中阐述了zuul的基本架构组成,并且将核心关键类相应做了标注以及分析,但是并未详细深入到关键性的细...

  • Spring Cloud Zuul源码分析

    在Spring Cloud Zuul中介绍了Zuul 1.x的基本使用,本文从源码角度介绍一下zuul的底层实现。...

  • Zuul源码分析

    目标 明确Zuul的执行流程和重要类的分析 Zuul过滤器的生命周期 源码分析 zuul怎么拦截我们的请求? Zu...

  • zuul 1.x 源码解析

    1、阅读环境 基于spring cloud 的zuul组件,源码亦是从demo中入手,方式以debug为主。 zu...

  • Zuul源码分析(一) FilterFileManager

    现在我们开始分析Zuul的源码。首先来说一下Zuul是什么。zuul 是netflix开源的一个API Gatew...

  • Spring Cloud Gateway 入门

    简介 Spring Cloud Gateway ,相比之前我们使用的 Zuul(1.x) 它有哪些优势呢?Zuul...

  • Spring Cloud(6) Zuul - Ribbon、Hy

    目标 开启zuul重试机制 配置Ribbon 配置hystrix zuul本身已经引入了对Ribbon、hystr...

  • Zuul 源码分析

    zuul是spring cloud 微服务体系中的网关,可以路由请求到具体的服务,同时做一些验签,解密等的与业务无...

  • Zuul源码分析

    ZUUL ZUUL功能如下: 认证 动态路由 安全 静态response处理 压力测试 流控 核心概念 type:...

  • 【九】Spring Cloud zuul

    zuul相当于是路由网关,默认结合了Ribbon。Zuul(1.x) 基于 Servlet,使用阻塞 API,它不...

网友评论

      本文标题:Zuul 1.x 重试策略(源码分析)

      本文链接:https://www.haomeiwen.com/subject/aseouftx.html