美文网首页
Spring Cloud升级之路 - Hoxton - 7. 后

Spring Cloud升级之路 - Hoxton - 7. 后

作者: 干货满满张哈希 | 来源:发表于2020-06-15 15:23 被阅读0次

    1. 修正实例列表乱序导致的负载均衡重试相同实例的问题

    虽然之前考虑了通过每个请求的traceId隔离负载均衡的position来实现重试不会重试相同实例的问题,但是没有考虑在负载均衡过程中,实例列表的更新。

    例如:

    • 请求第一次调用负载均衡,实例列表是:[实例1,实例2],position为1,对2取余=1,所以请求发送到实例2上面了
    • 请求失败,触发重试,实例列表缓存失效,更新后变成了:[实例2,实例1],position为2,对2取余=0,所以请求又发送到实例2上面了
    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        Span currentSpan = tracer.currentSpan();
        //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
        //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        long l = currentSpan.context().traceId();
        int seed = positionCache.get(l).getAndIncrement();
        //这里,serviceInstances可能与上次的内容不同
        //例如上次是实例1,实例2
        //这次是实例2,实例1
        return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size()));
    }
    

    所以,在这里追加排序,保证实例有序,从而进一步不会重试相同的实例。

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        Span currentSpan = tracer.currentSpan();
        //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
        //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        long l = currentSpan.context().traceId();
        int seed = positionCache.get(l).getAndIncrement();
        return new DefaultResponse(serviceInstances.stream().sorted(Comparator.comparing(ServiceInstance::getInstanceId)).collect(Collectors.toList()).get(seed % serviceInstances.size()));
    }
    

    2. WebFlux环境兼容与WebClient实现相同功能

    maven依赖:

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.7.RELEASE</version>
        </parent>
    
        <properties>
            <disruptor.version>3.4.2</disruptor.version>
            <resilience4j-spring-cloud2.version>1.1.0</resilience4j-spring-cloud2.version>
        </properties>
    
        <dependencies>
            <!--内部缓存框架统一采用caffeine-->
            <!--这样Spring cloud loadbalancer用的本地实例缓存也是基于Caffeine-->
            <dependency>
                <groupId>com.github.ben-manes.caffeine</groupId>
                <artifactId>caffeine</artifactId>
            </dependency>
    
            <!--日志需要用log4j2-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j2</artifactId>
            </dependency>
    
            <!--lombok简化代码-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
    
            <!--注册到eureka-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
            <!--spring cloud rpc相关-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-openfeign</artifactId>
            </dependency>
            <!--调用路径记录-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-sleuth</artifactId>
            </dependency>
            <!--暴露actuator相关端口-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webflux</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
            </dependency>
            <dependency>
                <groupId>io.github.resilience4j</groupId>
                <artifactId>resilience4j-spring-cloud2</artifactId>
                <version>${resilience4j-spring-cloud2.version}</version>
            </dependency>
            <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>${disruptor.version}</version>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Hoxton.SR4</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    

    其他的配置是一样的,重点在于,如何使用WebClient调用其他微服务,并且实现针对Get请求重试或者是所有请求的网络 IO 异常,例如connect timeout等等,或者是断路器异常(因为请求还没发出)。

    WebClient可以加入各种Filter,通过实现这些Filter来实现实例级别的断路器还有重试。

    源码:WebClientConfig.java

    实现重试:

    private static class RetryFilter implements ExchangeFilterFunction {
        private final String serviceName;
    
        private RetryFilter(String serviceName) {
            this.serviceName = serviceName;
        }
    
        @Override
        public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
            return exchangeFunction.exchange(clientRequest).retryWhen(Retry.onlyIf(retryContext -> {
                //get请求一定重试
                return clientRequest.method().equals(HttpMethod.GET)
                        //connect Timeout 是一种 IOException
                        || retryContext.exception() instanceof IOException
                        //实例级别的断路器的Exception
                        || retryContext.exception() instanceof CallNotPermittedException;
            }).retryMax(1).exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(1000)));
        }
    }
    

    实例级别的断路器:

    private static class InstanceCircuitBreakerFilter implements ExchangeFilterFunction {
        private final String serviceName;
        private final CircuitBreakerRegistry circuitBreakerRegistry;
        ;
    
        private InstanceCircuitBreakerFilter(String serviceName, CircuitBreakerRegistry circuitBreakerRegistry) {
            this.serviceName = serviceName;
            this.circuitBreakerRegistry = circuitBreakerRegistry;
        }
    
        @Override
        public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
            CircuitBreaker circuitBreaker;
            //这时候的url是经过负载均衡器的,是实例的url
            String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort();
            try {
                //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
                circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, serviceName);
            } catch (ConfigurationNotFoundException e) {
                circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
            }
    
            return exchangeFunction.exchange(clientRequest).transform(CircuitBreakerOperator.of(circuitBreaker));
        }
    }
    

    组装调用某个微服务(这里是service-provider)的WebClient

    public static final String SERVICE_PROVIDER = "service-provider";
    
    @Autowired
    private ReactorLoadBalancerExchangeFilterFunction lbFunction;
    
    @Bean(SERVICE_PROVIDER)
    public WebClient getWebClient(CircuitBreakerRegistry circuitBreakerRegistry) {
        ConnectionProvider provider = ConnectionProvider.builder(SERVICE_PROVIDER)
                .maxConnections(50).pendingAcquireTimeout(Duration.ofSeconds(5)).build();
        HttpClient httpClient = HttpClient.create(provider)
                .tcpConfiguration(client ->
                        //链接超时
                        client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
                                .doOnConnected(conn -> conn
                                        //读取超时
                                        .addHandlerLast(new ReadTimeoutHandler(1))
                                        .addHandlerLast(new WriteTimeoutHandler(1))
                                )
                );
    
        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                //Retry在负载均衡前
                .filter(new RetryFilter(SERVICE_PROVIDER))
                //负载均衡器,改写url
                .filter(lbFunction)
                //实例级别的断路器需要在负载均衡获取真正地址之后
                .filter(new InstanceCircuitBreakerFilter(SERVICE_PROVIDER, circuitBreakerRegistry))
                .baseUrl("http://" + SERVICE_PROVIDER)
                .build();
    }
    

    这样,我们就可以实现和之前feign一样的微服务调用了。

    @Log4j2
    @RestController
    public class TestController {
        @Resource(name = WebClientConfig.SERVICE_PROVIDER)
        private WebClient webClient;
    
        @RequestMapping("/testGetTimeOut")
        public Mono<String> testGetTimeOut() {
            return webClient.get().uri("/test-read-time-out")
                    .retrieve()
                    .bodyToMono(new ParameterizedTypeReference<>() {
                    });
        }
    
        @RequestMapping("/testPostTimeOut")
        public Mono<String> testPostTimeOut() {
            return webClient.post().uri("/test-read-time-out")
                    .retrieve()
                    .bodyToMono(new ParameterizedTypeReference<>() {
                    });
        }
    }
    

    相关文章

      网友评论

          本文标题:Spring Cloud升级之路 - Hoxton - 7. 后

          本文链接:https://www.haomeiwen.com/subject/qlxhxktx.html