1. 修正实例列表乱序导致的负载均衡重试相同实例的问题
虽然之前考虑了通过每个请求的traceId
隔离负载均衡的position
来实现重试不会重试相同实例的问题,但是没有考虑在负载均衡过程中,实例列表的更新。
例如:
- 请求第一次调用负载均衡,实例列表是:[实例1,实例2],position为1,对2取余=1,所以请求发送到实例2上面了
- 请求失败,触发重试,实例列表缓存失效,更新后变成了:[实例2,实例1],position为2,对2取余=0,所以请求又发送到实例2上面了
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
Span currentSpan = tracer.currentSpan();
//如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
//是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
int seed = positionCache.get(l).getAndIncrement();
//这里,serviceInstances可能与上次的内容不同
//例如上次是实例1,实例2
//这次是实例2,实例1
return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size()));
}
所以,在这里追加排序,保证实例有序,从而进一步不会重试相同的实例。
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
Span currentSpan = tracer.currentSpan();
//如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
//是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
int seed = positionCache.get(l).getAndIncrement();
return new DefaultResponse(serviceInstances.stream().sorted(Comparator.comparing(ServiceInstance::getInstanceId)).collect(Collectors.toList()).get(seed % serviceInstances.size()));
}
2. WebFlux环境兼容与WebClient实现相同功能
maven依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.7.RELEASE</version>
</parent>
<properties>
<disruptor.version>3.4.2</disruptor.version>
<resilience4j-spring-cloud2.version>1.1.0</resilience4j-spring-cloud2.version>
</properties>
<dependencies>
<!--内部缓存框架统一采用caffeine-->
<!--这样Spring cloud loadbalancer用的本地实例缓存也是基于Caffeine-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!--日志需要用log4j2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--lombok简化代码-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--注册到eureka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud rpc相关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--调用路径记录-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<!--暴露actuator相关端口-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-cloud2</artifactId>
<version>${resilience4j-spring-cloud2.version}</version>
</dependency>
<!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
其他的配置是一样的,重点在于,如何使用WebClient
调用其他微服务,并且实现针对Get
请求重试或者是所有请求的网络 IO 异常,例如connect timeout
等等,或者是断路器异常(因为请求还没发出)。
WebClient
可以加入各种Filter
,通过实现这些Filter
来实现实例级别的断路器还有重试。
实现重试:
private static class RetryFilter implements ExchangeFilterFunction {
private final String serviceName;
private RetryFilter(String serviceName) {
this.serviceName = serviceName;
}
@Override
public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
return exchangeFunction.exchange(clientRequest).retryWhen(Retry.onlyIf(retryContext -> {
//get请求一定重试
return clientRequest.method().equals(HttpMethod.GET)
//connect Timeout 是一种 IOException
|| retryContext.exception() instanceof IOException
//实例级别的断路器的Exception
|| retryContext.exception() instanceof CallNotPermittedException;
}).retryMax(1).exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(1000)));
}
}
实例级别的断路器:
private static class InstanceCircuitBreakerFilter implements ExchangeFilterFunction {
private final String serviceName;
private final CircuitBreakerRegistry circuitBreakerRegistry;
;
private InstanceCircuitBreakerFilter(String serviceName, CircuitBreakerRegistry circuitBreakerRegistry) {
this.serviceName = serviceName;
this.circuitBreakerRegistry = circuitBreakerRegistry;
}
@Override
public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
CircuitBreaker circuitBreaker;
//这时候的url是经过负载均衡器的,是实例的url
String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort();
try {
//使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, serviceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
return exchangeFunction.exchange(clientRequest).transform(CircuitBreakerOperator.of(circuitBreaker));
}
}
组装调用某个微服务(这里是service-provider
)的WebClient
:
public static final String SERVICE_PROVIDER = "service-provider";
@Autowired
private ReactorLoadBalancerExchangeFilterFunction lbFunction;
@Bean(SERVICE_PROVIDER)
public WebClient getWebClient(CircuitBreakerRegistry circuitBreakerRegistry) {
ConnectionProvider provider = ConnectionProvider.builder(SERVICE_PROVIDER)
.maxConnections(50).pendingAcquireTimeout(Duration.ofSeconds(5)).build();
HttpClient httpClient = HttpClient.create(provider)
.tcpConfiguration(client ->
//链接超时
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
.doOnConnected(conn -> conn
//读取超时
.addHandlerLast(new ReadTimeoutHandler(1))
.addHandlerLast(new WriteTimeoutHandler(1))
)
);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
//Retry在负载均衡前
.filter(new RetryFilter(SERVICE_PROVIDER))
//负载均衡器,改写url
.filter(lbFunction)
//实例级别的断路器需要在负载均衡获取真正地址之后
.filter(new InstanceCircuitBreakerFilter(SERVICE_PROVIDER, circuitBreakerRegistry))
.baseUrl("http://" + SERVICE_PROVIDER)
.build();
}
这样,我们就可以实现和之前feign
一样的微服务调用了。
@Log4j2
@RestController
public class TestController {
@Resource(name = WebClientConfig.SERVICE_PROVIDER)
private WebClient webClient;
@RequestMapping("/testGetTimeOut")
public Mono<String> testGetTimeOut() {
return webClient.get().uri("/test-read-time-out")
.retrieve()
.bodyToMono(new ParameterizedTypeReference<>() {
});
}
@RequestMapping("/testPostTimeOut")
public Mono<String> testPostTimeOut() {
return webClient.post().uri("/test-read-time-out")
.retrieve()
.bodyToMono(new ParameterizedTypeReference<>() {
});
}
}
网友评论