先看看@LoadBalanced注解:
/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
这就是一个普通的标记注解,作用就是修饰RestTemplate让其拥有负载均衡的能力,查看org.springframework.cloud.client.loadbalancer包下面的class,我们很容易发现
LoadBalancerAutoConfiguration这个类;
@Configuration//这是一个配置类
@ConditionalOnClass(RestTemplate.class)//这个配置文件加载必要条件是存在RestTemplate类和LoadBalancerClient
//Bean
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//这个很重要,这里的restTemplates是所有的被@LoadBalanced注解的集合,这就是标记注解的作用(Autowired是可以集合注入的)
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
}
};
}
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
}
//生成一个LoadBalancerInterceptor的Bean
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//给注解了@LoadBalanced的RestTemplate加上拦截器
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
}
总结:现在我们应该大致知道@loadBalanced的作用了,就是起到一个标记RestTemplate的作用,当服务启动时,标记了的RestTemplate对象里面就会被自动加入LoadBalancerInterceptor拦截器,这样当RestTemplate像外面发起http请求时,会被LoadBalancerInterceptor的intercept函数拦截,而intercept里面又调用了LoadBalancerClient接口实现类execute方法,我们接着往下看;
LoadBalancerInterceptor的intercept方法:
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
//这是以服务名为地址的原始请求:例:http://HELLO-SERVICE/hello
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
这里的LoadBalancerClient的实现是RibbonLoadBalancerClient:
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
//ZoneAwareLoadBalancer的Rule规则去选择Server(这里的规则是统一个zone的优先选择,这里返回的Server
//就是通过serviceId和Rule解析返回的带有IP:port形式的信息的Server)
protected Server getServer(ILoadBalancer loadBalancer) {
return loadBalancer == null ? null : loadBalancer.chooseServer("default");
}
可以看到,在execute
函数的实现中,第一步做的就是通过getServer
根据传入的服务名serviceId
去获得具体的服务实例:
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default");
}
通过getServer
函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用LoadBalancerClient
接口中的choose
函数,而是使用了ribbon自身的ILoadBalancer
接口中定义的chooseServer
函数。
我们先来认识一下ILoadBalancer
接口:
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
可以看到,在该接口中定义了一个软负载均衡器需要的一系列抽象操作(未例举过期函数):
-
addServers
:向负载均衡器中维护的实例列表增加服务实例。 -
chooseServer
:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。 -
markServerDown
:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。 -
getReachableServers
:获取当前正常服务的实例列表。 -
getAllServers
:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。
在该接口定义中涉及到的Server
对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。
而对于该接口的实现,我们可以整理出如上图所示的结构。我们可以看到BaseLoadBalancer
类实现了基础的负载均衡,而DynamicServerListLoadBalancer
和ZoneAwareLoadBalancer
在负载均衡的策略上做了一些功能的扩展。
那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration
配置类,可以知道在整合时默认采用了ZoneAwareLoadBalancer
来实现负载均衡器。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping) {
ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
.withClientConfig(config).withRule(rule).withPing(ping)
.withServerListFilter(serverListFilter).withDynamicServerList(serverList)
.buildDynamicServerListLoadBalancer();
return balancer;
}
下面,我们再回到RibbonLoadBalancerClient
的execute
函数逻辑,在通过ZoneAwareLoadBalancer
的chooseServer
函数获取了负载均衡策略分配到的服务实例对象Server
之后,将其内容包装成RibbonServer
对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调LoadBalancerInterceptor
请求拦截器中LoadBalancerRequest
的apply(final ServiceInstance instance)
函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求,到实际访问host:post形式的具体地址的转换。
apply(final ServiceInstance instance)
函数中传入的ServiceInstance
接口是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如:serviceId、host、port等,具体定义如下:
public interface ServiceInstance {
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
}
而上面提到的具体包装Server
服务实例的RibbonServer
对象就是ServiceInstance
接口的实现,可以看到它除了包含了Server
对象之外,还存储了服务名、是否使用https标识以及一个Map类型的元数据集合。
protected static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
protected RibbonServer(String serviceId, Server server) {
this(serviceId, server, false, Collections.<String, String> emptyMap());
}
protected RibbonServer(String serviceId, Server server, boolean secure,
Map<String, String> metadata) {
this.serviceId = serviceId;
this.server = server;
this.secure = secure;
this.metadata = metadata;
}
// 省略实现ServiceInstance的一些获取Server信息的get函数
...
}
那么apply(final ServiceInstance instance)
函数,在接收到了具体ServiceInstance
实例后,是如何通过LoadBalancerClient
接口中的reconstructURI
操作来组织具体请求地址的呢?
@Override
public ClientHttpResponse apply(final ServiceInstance instance)
throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance);
return execution.execute(serviceRequest, body);
}
从apply
的实现中,我们可以看到它具体执行的时候,还传入了ServiceRequestWrapper
对象,该对象继承了HttpRequestWrapper
并重写了getURI
函数,重写后的getURI
会通过调用LoadBalancerClient
接口的reconstructURI
函数来重新构建一个URI来进行访问。
private class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
...
@Override
public URI getURI() {
URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
}
在LoadBalancerInterceptor
拦截器中,ClientHttpRequestExecution
的实例具体执行execution.execute(serviceRequest, body)
时,会调用InterceptingClientHttpRequest
下InterceptingRequestExecution
类的execute
函数,具体实现如下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
delegate.getHeaders().putAll(request.getHeaders());
if (body.length > 0) {
StreamUtils.copy(body, delegate.getBody());
}
return delegate.execute();
}
}
可以看到在创建请求的时候requestFactory.createRequest(request.getURI(), request.getMethod());
,这里request.getURI()
会调用之前介绍的ServiceRequestWrapper
对象中重写的getURI
函数。此时,它就会使用RibbonLoadBalancerClient
中实现的reconstructURI
来组织具体请求的服务实例地址。
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
Server server = new Server(instance.getHost(), instance.getPort());
boolean secure = isSecure(server, serviceId);
URI uri = original;
if (secure) {
uri = UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();
}
return context.reconstructURIWithServer(server, uri);
}
从reconstructURI
函数中,我们可以看到,它通过ServiceInstance
实例对象的serviceId
,从SpringClientFactory
类的clientFactory
对象中获取对应serviceId
的负载均衡器的上下文RibbonLoadBalancerContext
对象。然后根据ServiceInstance
中的信息来构建具体服务实例信息的Server
对象,并使用RibbonLoadBalancerContext
对象的reconstructURIWithServer
函数来构建服务实例的URI。
为了帮助理解,简单介绍一下上面提到的SpringClientFactory
和RibbonLoadBalancerContext
:
-
SpringClientFactory
类是一个用来创建客户端负载均衡器的工厂类,该工厂会为每一个不同名的ribbon客户端生成不同的Spring上下文。 -
RibbonLoadBalancerContext
类是LoadBalancerContext
的子类,该类用于存储一些被负载均衡器使用的上下文内容和Api操作(reconstructURIWithServer
就是其中之一)。
从reconstructURIWithServer
的实现中我们可以看到,它同reconstructURI
的定义类似。只是reconstructURI
的第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance
,而reconstructURIWithServer
中使用了Netflix中定义的Server
,所以在RibbonLoadBalancerClient
实现reconstructURI
时候,做了一次转换,使用ServiceInstance
的host和port信息来构建了一个Server
对象来给reconstructURIWithServer
使用。从reconstructURIWithServer
的实现逻辑中,我们可以看到,它从Server
对象中获取host和port信息,然后根据以服务名为host的URI
对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。
public class LoadBalancerContext implements IClientConfigAware {
...
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server .getPort();
if (host.equals(original.getHost())
&& port == original.getPort()) {
return original;
}
String scheme = original.getScheme();
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
...
}
另外,从RibbonLoadBalancerClient
的execute
的函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,ribbon还通过RibbonStatsRecorder
对象对服务的请求还进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。
分析到这里,我们已经可以大致理清Spring Cloud中使用Ribbon实现客户端负载均衡的基本脉络。了解了它是如何通过LoadBalancerInterceptor
拦截器对RestTemplate
的请求进行拦截,并利用Spring Cloud的负载均衡器LoadBalancerClient
将以逻辑服务名为host的URI转换成具体的服务实例的过程。同时通过分析LoadBalancerClient
的Ribbon实现RibbonLoadBalancerClient
,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer
接口的实现,自动化配置会采用ZoneAwareLoadBalancer
的实例来进行客户端负载均衡实现。
网友评论