美文网首页
RPC(七)

RPC(七)

作者: supremecsp | 来源:发表于2022-06-22 14:20 被阅读0次

eureka客户端获取到注册表后,如果接口有多个服务提供,是如何选择调用哪一个呢?
最底层负载代码的实现,具体代码在
com.netflix.discovery.shared.resolver.ResolverUtils.randomize() 中:



可以看到,是通过我们EurekaClient端的ipv4做为随机的种子,生成一个重新排序的serverList

private static int getZoneOffset(String myZone, boolean preferSameZone, String[] availZones) {
   for (int i = 0; i < availZones.length; i++) {
       if (myZone != null && (availZones[i].equalsIgnoreCase(myZone.trim()) == preferSameZone)) {
           return i;
       }
   }
   logger.warn("DISCOVERY: Could not pick a zone based on preferred zone settings. My zone - {}," +
           " preferSameZone- {}. Defaulting to " + availZones[0], myZone, preferSameZone);
 
   return 0;
}

当方法参数 preferSameZone=true ,即 eureka.preferSameZone=true( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【相等】元素的位置。
当方法参数 preferSameZone=false ,即 eureka.preferSameZone=false( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【不相等】元素的位置。

故障转移原理

我们的serverList按照client端的ip进行重排序后,每次都会请求第一个元素作为和Server端交互的host,如果请求失败,会尝试请求serverList列表中的第二个元素继续请求,这次请求成功后,会将此次请求的host放到全局的一个变量中保存起来,下次client端再次请求 就会直接使用这个host。


我们来分析下这个代码:
第101行,获取client上次成功server端的host,如果有值则直接使用这个host
第105行,getHostCandidates()是获取client端配置的serverList数据,且通过ip进行重排序的列表
第114行,candidateHosts.get(endpointIdx++),初始endpointIdx=0,获取列表中第1个元素作为host请求
第120行,获取返回的response结果,如果返回的状态码是200,则将此次请求的host设置到全局的delegate变量中
第133行,执行到这里说明第120行执行的response返回的状态码不是200,也就是执行失败,将全局变量delegate中的数据清空
再次循环第一步,此时endpointIdx=1,获取列表中的第二个元素作为host请求
依次执行,第100行的循环条件numberOfRetries=3,最多重试2次就会跳出循环
文章来源:生产的一个问题的通彻思考:Eureka注册中心集群如何实现客户端请求负载及故障转移?,Eureka 源码解析 —— EndPoint 与 解析器

由于IP进行负载,情况并不是很均匀,程序中可以引入spring-cloud-loadbalancer进行负载,默认均衡策略是RoundRobinLoadBalancer轮询节点

 @Bean
    @ConditionalOnMissingBean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
            LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinLoadBalancer(
                loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    }

spring-cloud-loadbalancer使用:注意了,ribbon将被替换

RPC 框架里面的一个核心的功能——健康检测,它能帮助我们从连接列表里面过滤掉一些存在问题的节点,避免在发请求的时候选择出有问题的节点而影响业务。
除了负载均衡外,我们可以通过在注册中心配置一些:同一实例最大尝试次数(spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance),其他实例最大尝试次数(spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance)
配置 ReadTimeout:读取超时,也叫响应超时。ConnectTimeout:连接超时

ribbon:
  ConnectTimeout: 10000 # 连接超时时间(ms)
  ReadTimeout: 10000 # 通信超时时间(ms)
  OkToRetryOnAllOperations: false # 是否对所有操作重试 默认为false
  MaxAutoRetriesNextServer: 0 # 同一服务不同实例的重试次数 默认为1
  MaxAutoRetries: 0 # 同一实例的重试次数 默认为0

最大重试次数 = (1+maxAutoRetries)*(1+maxAutoRetriesNextServer)次

均衡策略可参考:Ribbon负载均衡策略

eureka注册最长发现时间:《spring cloud 微服务架构开发实践》
实际进行微服务架构开发过程中,经常会遇到一个服务实例上线后需要很长一段时间才能够被其他服务调用者获取和使用,并不能在微服务一上线就立即被获取到,这是Eureka的机制造成的。在Eureak服务治理环境下,一个微服务上线有三处缓存处理和一处延迟处理,经过这些处理后才能够被服务消费方获取到并使用,它们分别是:
1,Eureka服务器对服务注册列表进行缓存,默认时间为30秒。所以即使一个服务实例刚刚注册成功,它也可能不会立即在/eureka/apps端点的结果中出现。
2,Eureka客户端(服务消费方)对注册的服务信息进行缓存,默认时间为30秒,也就是说客户端决定刷新本地缓存并发现其他新注册的实例可能需要30秒。
3,Ribbon负载均衡会从Eureka客户端获取服务列表,并将负载均衡后的结果缓存30秒,所以对于Eureka客户端新同步过来的服务节点,可能也会需要30秒之后才能被负载均衡使用。
4,服务实例(Eureka客户端)在启动时(不是启动完成),不是立即向Eureka服务器注册,而是在一个延迟时间(默认为40秒)之后才向Eureka服务器注册。
综上几个因素,一个新的服务实例,即使能够很快启动的实例,也不能马上被Eureka服务器发现,其他服务消费者需要一段时间,最长可能需要2分钟以上,才能够被发现和使用。

《RPC 实战与核心原理》
我们要确保被调用的服务的业务逻辑是幂等的,这样我们才能考虑根据事件情况开启 RPC 框架的异常重试功能。
在重试的过程中,为了能够在约定的时间内进行安全可靠地重试,在每次触发重试之前,
1,我们需要先判定下这个请求是否已经超时,如果超时了会直接返回超时异常,否则我们需要重置下这个请求的超时时间,防止因多次重试导致这个请求的处理时间超过用户配置的超时时间,从而影响到业务处理的耗时。
2,在发起重试、负载均衡选择节点的时候,我们应该去掉重试之前出现过问题的那个节点,这样可以提高重试的成功率,并且我们允许用户配置可重试异常的白名单,这样可以让 RPC 框架的异常重试功能变得更加友好。

要配置Feign读取超时,必须同时配置连接超时



同时配置Feign和Ribbon的超时,以Feign为准


Ribbon框架超时和重置并没有强关联,比如一个请求2秒超时后仍能触发重试机制。可能再过2秒继续超时



对于可重试异常的白名单,Ribbon可以继承FeignLoadBalancer并重写getRequestSpecificRetryHandler方法来替换RetryHandler


/**
     * 自定义FeignLoadBalancer,替换默认的RequestSpecificRetryHandler
     */
    public static class MyFeignLoadBalancer extends FeignLoadBalancer {

        public MyFeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig, ServerIntrospector serverIntrospector) {
            super(lb, clientConfig, serverIntrospector);
        }

        @Override
        public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonRequest request, IClientConfig requestConfig) {
            // 返回自定义的RequestSpecificRetryHandler
            // 参数一:是否连接异常重试时重试
            // 参数二:是否所有异常都重试
            return new RequestSpecificRetryHandler(false, false,
                    getRetryHandler(), requestConfig) {
                /**
                 * @param e 抛出的异常
                 * @param sameServer 是否同节点服务的重试
                 * @return
                 */
                @Override
                public boolean isRetriableException(Throwable e, boolean sameServer) {
                    if (e instanceof ClientException) {
                        // 连接异常重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.CONNECT_EXCEPTION) {
                            return true;
                        }
                        // 连接超时重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.SOCKET_TIMEOUT_EXCEPTION) {
                            return true;
                        }
                        // 读超时重试,读超时重试只允许不同服务节点的重试
                        // 所以同节点的重试不支持,读超时了就不要重新请求同一个节点了。
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.READ_TIMEOUT_EXCEPTION) {
                            return !sameServer;
                        }
                        // 服务端异常
                        // 服务端异常切换新节点重试
                        if (((ClientException) e).getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
                            return !sameServer;
                        }
                    }
                    // 连接异常时重试
                    return isConnectionException(e);
                }
            };
        }
    }

由于FeignLoadBalancer是在OpenFeign的LoadBalancerFeignClient中调用一个CachingSpringLoadBalancerFactory创建的,所以我们还需要替换OpenFeign的FeignRibbonClientAutoConfiguration配置类注册的CachingSpringLoadBalancerFactory,并且重写CachingSpringLoadBalancerFactory的create方法,代码如下。

@Configuration
public class RibbonConfiguration {
    /**
     * 使用自定义FeignLoadBalancer缓存工厂
     *
     * @return
     */
    @Bean
    public CachingSpringLoadBalancerFactory cachingSpringLoadBalancerFactory() {
        return new CachingSpringLoadBalancerFactory(springClientFactory) {

            private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();

            @Override
            public FeignLoadBalancer create(String clientName) {
                FeignLoadBalancer client = this.cache.get(clientName);
                if (client != null) {
                    return client;
                }
                IClientConfig config = this.factory.getClientConfig(clientName);
                ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
                ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
                        ServerIntrospector.class);
                // 使用自定义的FeignLoadBalancer
                client = new MyFeignLoadBalancer(lb, config, serverIntrospector);
                this.cache.put(clientName, client);
                return client;
            }
        };
    }
}

需要注意的是:默认ribbon重试一次,POST请求默认是在connect异常的时候会重试,GET请求是都会进行重试,包括请求超时之类的。所以尽量不要用GET请求做增加,修改,删除的操作
POST想重试可配置spring.cloud.loadbalancer.retry.enabled 跟 ribbon.OkToRetryOnAllOperations为true解决

//org.springframework.cloud.netflix.feign.ribbon.FeignLoadBalancer.java
@Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
            RibbonRequest request, IClientConfig requestConfig) {
        // 1. 从ribbon的配置里面获取OkToRetryOnAllOperations的配置,默认为false,如果配置为true
        if (this.clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations,
                false)) {
            // 
            return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
                    requestConfig);
        }
        // 2. 判断请求方式是否是POST请求
        if (!request.toRequest().method().equals("GET")) {
            return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
                    requestConfig);
        }
        else {
            // 3. 默认的重试handler
            return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
                    requestConfig);
        }
    }

文章摘抄:HTTP调用超时咋办?重复请求又如何?Ribbon重试策略RetryHandler的配置与源码分析

相关文章

网友评论

      本文标题:RPC(七)

      本文链接:https://www.haomeiwen.com/subject/qjljvrtx.html