前言
image.png这篇文章参考了Spring+Cloud微服务实战这本书。但是在此基础上延伸了很多知识点。
源码分析
@LoadBalanced
注解被@Qualifier
注解
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
对于Spring
中的AnnotationMetadata
不太熟悉的同学,可以跑一下下面的CASE
public class MetaTest1 {
public static void main(String[] args) {
StandardAnnotationMetadata metadata = new StandardAnnotationMetadata(
MetaDemo.class, true
);
System.out.println("============ClassMetadata===================");
ClassMetadata classMetadata = metadata;
System.out.println(classMetadata.getClassName());
// 是不是内部类
System.out.println(classMetadata.getEnclosingClassName());
// 返回内部类集合
System.out.println(StringUtils.arrayToCommaDelimitedString(
classMetadata.getMemberClassNames()
));
// 返回接口集合
System.out.println(StringUtils.arrayToCommaDelimitedString(classMetadata.getInterfaceNames()));
// 有没有超类, 如果超类是Object,那么为false
System.out.println(classMetadata.hasSuperClass());
System.out.println(classMetadata.getSuperClassName());
System.out.println(classMetadata.isAnnotation());
System.out.println(classMetadata.isFinal());
// 就是可以独立new出来的, top class或者static inner class
System.out.println(classMetadata.isIndependent());
System.out.println("==========AnnotatedTypeMetadata====================");
AnnotatedTypeMetadata annotatedTypeMetadata = metadata;
System.out.println(annotatedTypeMetadata.isAnnotated(Service.class.getName()));
System.out.println(annotatedTypeMetadata.isAnnotated(Component.class.getName()));
System.out.println(annotatedTypeMetadata.isAnnotated(EnableAsync.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Service.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Component.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Repository.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
// 数组返回 不会进行属性合并的操作
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Service.class.getName()));
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Component.class.getName()));
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(EnableAsync.class.getName()));
System.out.println("=================AnnotationMetadata=================");
AnnotationMetadata annotationMetadata = metadata;
// 获取元注解
System.out.println(annotationMetadata.getAnnotationTypes());
// 获取service注解的元注解
System.out.println(annotationMetadata.getMetaAnnotationTypes(Service.class.getName()));
// 获取component注解的元注解
/**
* meta就是获取注解上面的注解,会排除掉java.lang这些注解们
*/
System.out.println(annotationMetadata.getMetaAnnotationTypes(Component.class.getName()));
// 不会去找元注解的,true
System.out.println(annotationMetadata.hasAnnotation(Service.class.getName()));
// false
System.out.println(annotationMetadata.hasAnnotation(Component.class.getName()));
/**
* 确定基础类是否有一个自身的注释 使用给定类型的元注释进行注释。
*/
// false
System.out.println(annotationMetadata.hasMetaAnnotation(Service.class.getName()));
// true
System.out.println(annotationMetadata.hasMetaAnnotation(Component.class.getName()));
System.out.println(annotationMetadata.hasAnnotatedMethods(Autowired.class.getName()));
// StandardMethodMetadata
annotationMetadata.getAnnotatedMethods(Autowired.class.getName())
.forEach(method -> {
System.out.println(method.getClass());
System.out.println(method.getDeclaringClassName());
System.out.println(method.getMethodName());
System.out.println(method.getReturnTypeName());
});
}
@Repository("repository")
@Service("serviceName")
@EnableAsync
public static class MetaDemo extends HashMap<String, String> implements
Serializable {
private static class InnerClass {
}
@Autowired
private String getName() {
return "xiaoma";
}
}
}
运行结果如下:
============ClassMetadata===================
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo$InnerClass
java.io.Serializable
true
java.util.HashMap
false
false
true
==========AnnotatedTypeMetadata====================
true
true
true
{value=serviceName}
{value=repository}
{value=repository}
{order=2147483647, annotation=interface java.lang.annotation.Annotation, mode=PROXY, proxyTargetClass=false}
{value=[serviceName]}
{value=[, ]}
{order=[2147483647], annotation=[interface java.lang.annotation.Annotation], mode=[PROXY], proxyTargetClass=[false]}
=================AnnotationMetadata=================
[org.springframework.stereotype.Repository, org.springframework.stereotype.Service, org.springframework.scheduling.annotation.EnableAsync]
[org.springframework.stereotype.Component, org.springframework.stereotype.Indexed]
[]
true
false
false
true
true
class org.springframework.core.type.StandardMethodMetadata
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
getName
java.lang.String
负载均衡器客户端LoadBalancerClient
接口
// 使用从负载均衡器中挑选一个对应服务的实例
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
// 为系统构建一个合适的host:port形式的url
// ServiceInstance对象是带有host和port的具体服务实例
// 后者url对象则是使用逻辑服务定义为host的URL,比如http://SERVICE-PROVIDER/serviceprovider/hello
URI reconstructURI(ServiceInstance instance, URI original);
服务实例选择器ServiceInstanceChooser
接口
// 从负载均衡器中挑选一个服务实例
ServiceInstance choose(String serviceId)
LoadBalancerAutoConfiguration
类实现客户端负载均衡器的自动化配置
-
RetryLoadBalancerInterceptor
:用于实现对客户端发起请求时拦截,以实现客户端负载均衡 -
RestTemplateCustomizer
:用于给RestTemplate
增加LoadBalancerInterceptor
拦截器 - 维护了一个被
@LoadBalanced
注解修饰的RestTemplate
对象列表,并在这里进行初始化通过调用RestTemplatCustomizer
的实例给需要客户端负载均衡的RestTemplate
增加LoadBalancerInterceptor
拦截器 -
RibbonLoadBalancedRetryFactory
给客户端负债均衡增加重试机制 -
LoadBalancerRequestFactory
对request
进行包装加工成LoadBalancerRequest
,调用ClientHttpRequestExecution
中的execute(serviceRequest, body)
,返回ClientHttpResponse
LoadBalancerInterceptor
:
当一个@LoadBalanced
注解修饰的RestTemplate
对象向外发起Http请求,
会被这个类所拦截,由于我们在使用RestTemplate
时采用了服务器名作为host
,
所以直接从HttpRequest
的URI
对象中通过getHost
就可以拿到服务名。
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
RibbonLoadBalancerClient
中execute
方法可以看到getServer
中是通过ILoadBalancer
接口来获取服务。
对了这里说明以下 在一个应用里面比如调用了A,B服务 那么应用会创建一个A的Ribbon
容器和B的ribbon
容器。而且容器还是懒加载,所以第一次请求总是会超时
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
Ribbon
的容器工厂是SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>
在RibbonAutoConfiguration
配置中, 有将List<RibbonClientSpecification> configurations = new ArrayList<>()
属性设置给SpringClientFactory
那SpringBoot是怎么扫描@RibbonClient
注解的呢? RibbonClientSpecification
是怎么注册到Spring容器呢?
服务ID和对应的ribbon配置是怎么关联起来的呢?详情可以看到RibbonClientConfigurationRegistrar
可以看到我们将每个服务ID和对应的ribbon配置通过RibbonClientSpecification
来维护,同时注册到Spring容器中。
private void registerClientConfiguration(BeanDefinitionRegistry registry,
Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(RibbonClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + ".RibbonClientSpecification",
builder.getBeanDefinition());
}
Feign
和Ribbon
容器创建就是在这里进行的, 还把他们的配置类装在自己的容器里, 此时SpringBoot
容器也有一份哟。同时将SpringBoot
容器设置成父容器。
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object> singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
在RibbonClientConfiguration
配置类中, 配置基本和客户端怎么使用ribbon
有关系。
负载均衡器ILoadBalancer
接口
// 向负载均衡器中维护的实例列表增加服务实例
public void addServers(List<Server> newServers);
// 通过某种策略,从负载均衡器中挑选出一个具体的实例
public Server chooseServer(Object key);
// 用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器
在下一次获取服务实例清单前都会认为服务实例均是正常
public void markServerDown(Server server);
// 获取当前正常服务的实例列表
public List<Server> getReachableServers();
// 获取所有已知的服务实例列表,包括正常服务和停止服务的实例。
public List<Server> getAllServers();
BaseLoadBalancer
类实现了基础的负载均衡功能
而DynamicServerListLoadBalancer
和ZoneAwardLoadBalancer
在RibbonClientConfiguration
中可以看到是ZoneAwardLoadBalancer
可以看到有ILoadBalancer
,IPing
,IRule
,ServerList
,ServerListFilter
之类的配置
这里我们可以看到PropertiesFactory
这个类就明白了为什么可以通过serviceId.ribbon.NFLoadBalancerRuleClassName=RuleImpl.class
配置负载均衡策略
实际上是靠Constructor
(参数是IClientConfig
)反射去完成实例化的。
如果是IRule
,IPing
的话可能会抛出异常,但是这里忽略掉了。
不能靠有参构造器初始化的话,接着调用BeanUtils.instantiate(clazz)
完成初始化;
public class PropertiesFactory {
@Autowired
private Environment environment;
private Map<Class, String> classToProperty = new HashMap<>();
public PropertiesFactory() {
classToProperty.put(ILoadBalancer.class, "NFLoadBalancerClassName");
classToProperty.put(IPing.class, "NFLoadBalancerPingClassName");
classToProperty.put(IRule.class, "NFLoadBalancerRuleClassName");
classToProperty.put(ServerList.class, "NIWSServerListClassName");
classToProperty.put(ServerListFilter.class, "NIWSServerListFilterClassName");
}
public boolean isSet(Class clazz, String name) {
return StringUtils.hasText(getClassName(clazz, name));
}
public String getClassName(Class clazz, String name) {
if (this.classToProperty.containsKey(clazz)) {
String classNameProperty = this.classToProperty.get(clazz);
String className = environment.getProperty(name + "." + NAMESPACE + "." + classNameProperty);
return className;
}
return null;
}
@SuppressWarnings("unchecked")
public <C> C get(Class<C> clazz, IClientConfig config, String name) {
String className = getClassName(clazz, name);
if (StringUtils.hasText(className)) {
try {
Class<?> toInstantiate = Class.forName(className);
return (C) SpringClientFactory.instantiateWithConfig(toInstantiate, config);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unknown class to load "+className+" for class " + clazz + " named " + name);
}
}
return null;
}
}
而我们上面在RibbonLoadBalancerClient
中的getLoadBalancer(serviceId)
方法最后就是调用以下的方法
首先从该服务的Ribbon
容器, SpringBoot
容器 寻找ILoadBalancer
的实现类
如果不存在的话, 接着找IClientConfig
的实现类
找到了IClientConfig
的实现类就可以围魏救赵了
接着调用instantiateWithConfig(getContext(name), type, config);
通过有参构造器初始化ILoadBalancer
, 参数是IClientConfig
这时候你就会问了通过反射初始化 会导致很多属性没有注入,那绝对有问题啊
其实这些属性比如ServerList
,ServerListFilter
,Rule
,Ping
之类的属性
会通过DefaultClientConfigImpl
中去拿, 也就是构造器中的那个IClientConfig
参数
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
@Override
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
在SpringClientFactory
中instantiateWithConfig
方法中看到以下代码
可能是为了兼容 IPing
,IRule
,ServerList
,ServerListFilter
这些实现类,同时还注入属性。
if (result == null) {
result = BeanUtils.instantiate(clazz);
if (result instanceof IClientConfigAware) {
((IClientConfigAware) result).initWithNiwsConfig(config);
}
if (context != null) {
context.getAutowireCapableBeanFactory().autowireBean(result);
}
}
上面说到Ribbin
容器是懒加载,那么我们可不可以设置成急加载呢?
RibbonAutoConfiguration
中可以配置RibbonApplicationContextInitializer
通过接收SpringBoot
启动完毕事件 ApplicationReadyEvent
初始化clients
中的容器
这样就可以避免第一次调用超时,但是增加了应用启动时间。有得有失把
@Configuration
@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {
...
}
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
@Bean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
return new RibbonLoadBalancedRetryFactory(clientFactory);
}
@Bean
@ConditionalOnMissingBean
public PropertiesFactory propertiesFactory() {
return new PropertiesFactory();
}
@Bean
@ConditionalOnProperty(value = "ribbon.eager-load.enabled")
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(springClientFactory(),
ribbonEagerLoadProperties.getClients());
}
回归正题 在RibbonLoadBalancerClient
中的execute()
中,我们已经获取了ServiceInstance
,那么怎么得到请求的url呢
把目光转移到request.apply(serviceInstance)
这个方法
也就是上文说到的LoadBalancerRequestFactory
中的createRequest()
实现该接口中的这个方法
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
在LoadBalancerInterceptor
拦截器中,ClientHttpRequestExecution
的实例
具体会执行execute(servletRequest,body)
时,会调用InterceptingClientHttpRequest
下
InterceptingRequestExecution
类中的execute
函数, 拦截器的链式处理就体现在这里
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
还记得我们之前说提到的RestTemplate
中设置LoadBalancerInterceptor
吗?
这里的拦截器责任链就是我们之前设置的。
我们调用restTemplate.execute()
最终会经过这里被拦截
RestTemplate
中的HttpRequestFactory
就是InterceptingClientHttpRequestFactory
其createRequest
方法如下
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
可以看到RestTemplate
中的doExecute()
最终会执行request.execute()
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
看到InterceptingClientHttpRequest
中的executeInternal
那么流程又回到我们刚才的InterceptingRequestExecution
中的execute
函数
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
在InterceptingRequestExecution
中的execute
函数
注意到ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
这里的requestFactory
可不是LoadBalancerRequestFactory
, 别搞混了
这里的requestFatory
是实现了ClientHttpRequestFactory
接口的factory
,而LoadBalancerRequestFactory
没有实现这个接口.
这里的requestFactory
其实是通过setRequestFactory(requestFactory)
函数去设置的。
在我们应用里面没有调用这个函数,其默认的实现是SimpleClientHttpRequestFactory
比较常见的实现有HttpComponentsClientHttpRequestFactory
,RibbonClientHttpRequestFactory
等等等
public abstract class HttpAccessor {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
...
}
这里的request.getURI
其实是调用的ServiceRequestWrapper
中的getURI
方法
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
先前我们在LoadBalancerInterceptor
有设置loadBalancerClient
和requestFactory
属性,我们调用loadBalancer
.chooseServer
获取Server
节点信息, 把它包装成RibbonServer
, 也就是这里的serviceInstance
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
ServiceRequestWrapper
中的getURI
方法如下
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
}
返回的URI实则是LoadBalancerClient
中获取得到的。
这里我们可以看到从 服务的ribbon
容器获取其RibbonLoadBalancerContext
上下文信息
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
} else {
server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
RibbonLoadBalancerContext
根据RibbonServer.getServer
返回的信息和原始的URI
得到最终的URI信息, 也就是上文提到带有服务HOST:PORT的URI
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost())
&& port == original.getPort()
&& scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
不知不觉,已经写了这么多。勿以善小而不为,勿以恶小而为之,再接再厉把
为什么一个服务对应一个ribbon
容器,Feigin
容器呢? 我觉得应该是为了资源隔离把
SpringClientFactory
类是应该用来创建客户端负载均衡器的工程类,该工厂类会为每一个不同名的ribbon
客户端生成不同的Spring
上下文
RibbonLoadBalancerContext
类是LoadBalancerContext
的子类,该类用于存储一些被负载均衡器使用的上下文内容和API
操作(比如得到真实的URI
, 重试机制的判断, 获取DefaultClientConfigImpl
类的配置信息等等等)
关于RibbonLoadBalancerContext
配置可以看RibbonClientConfiguration
配置类
@Bean
@ConditionalOnMissingBean
public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
IClientConfig config, RetryHandler retryHandler) {
return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
}
我们回顾整一个请求的过程, 首先RestTemplate.doExecute()
实际上是调用request.execute()
方法,
此时我们要进行狸猫换太子操作, 通过InterceptingClientHttpRequestFactory
(内部委托了一个真正执行发起请求的requestFactory
)创建出带拦截属性的InterceptingClientHttpRequest
对象
该对象调用execute()
会被拦截LoadBalancerInterceptor
所拦截到请求, 我们负载均衡器根据rule
选出一个适合的服务实例地址,
再把请求交给InterceptingClientHttpRequestFactory
中委托的requestFactory
处理
ClientHttpRequestFactory
有很多实现,比如nettyClient
,HttpClient
,RibbonClient
,OkHttpClient
, 通过这些客户端发起对服务实例地址的请求。
接下来的篇幅会讲到上文所提到的负载均衡器ILoadBalancer
接口的实现
AbstractLoadBalancer
是ILoadBalancer
接口的抽象实现,它把实例进行了分组
-
ALL
:所有服务实例 -
STATUS_UP
:正常服务的实例 -
STATUS_NOT_UP
:停止服务的实例
最后定义了2个抽象函数
-
getServerList(ServerGroup serverGroup)
:根据分组类型获取服务实例列表 -
getLoadBalancerStats()
:获取当前负载均衡器各个服务实例当前的属性和统计信息
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
/**
* delegate to {@link #chooseServer(Object)} with parameter null.
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* List of servers that this Loadbalancer knows about
*
* @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* Obtain LoadBalancer related Statistics
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
BaseLoadBalancer
- 定义维护了2个存储服务实例
Server
对象的列表,一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单 - 定义用来存储负载均衡器各服务实例属性和统计信息的
LoadBalancerStats
对象 - 定义了检查服务实例是否正常服务的
IPing
对象,在BaseLoadBalancer
中默认为null
,需要在构造时注入实现 - 定义了检查服务实例操作的执行策略对象
IPingStrategy
,默认实现是SerialPingStrategy
,采用的是线性遍历ping
服务实例的方式实现检查 - 定义了负载均衡的处理规则
IRule
对象 - 启动
ping
任务,在BaseLoadBalancer
的默认构造函数中,会直接启动一个用于定时检查Server
是否健康的任务,该任务默认的执行间隔是10s
DynamicServerListLoadBalancer
定义了服务列表操作对象ServerList
-
getInitialListOfServers
用于获取初始化的服务实例清单 -
getUpdatedListOfServers
用于获取更新的服务实例清单
在DynamicServerListLoadBalancer
中使用的是哪个ServerList
的实现呢
在EurekaRibbonClientConfiguration
中可以看到端倪, 可以看到IPing
,IServerListFilter
的实现都是和Eureka
相关的。
EurekaRibbonClientConfiguration
和RibbonAutoConfiguration
是相辅相成的,前者优先级高于后者。
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
DomainExtractingServerList
中的getInitialListOfServers
和getUpdatedListOfServers
的具体实现,其实委托给了DiscoveryEnabledNIWSServerList
而DisconveryEnabledNIWSServerList
是通过obtainServersViaDiscovery
通过服务发现机制来实现服务实例的获取
主要通过EurekaClient
从服务注册中心获取具体的服务实例InstanceInfo
列表
这里的vipAddress
可以是逻辑上的服务名,比如hello-service
接着对这些服务实例进行遍历, 将状态为UP
的实例转换成DiscoveryEnabledServer
对象最后将这些实例组成列表返回.
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
DomainExtractingServer
后续将这些list
通过setZones
函数继续处理,转换成
DomainExtractingServer
, 设置一些必要的属性,比如id
,zone
,isAliveFlag
等等等
ServerListUpdater
在DynamicServerListLoadBalancer
中可以看到定义
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
对于ServerListUpdater
的实现有2个
-
PollingServerListUpdater
:动态服务列表更新的默认策略,在DynamicServerListLoadBalancer
负载均衡器的默认实现就是它,它是通过定时更新回调UpdateAction中的doUpdate函数 -
EurekaNotificationServerListUpdater
:它的触发机制与PollingServerListUpdater
不同,它需要利用Eureka
的时间监听器来驱动服务的更新操作。通过接收EurekaEvent
时间,异步回调doUpdate函数完成刷新实例列表。
PollingServerListUpdater
内部实现比较简单, 定时任务初始化1s后执行, 并以30s为周期重复执行,同时还会记录最后更新时间,是否存活等信息。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
ServerListFilter
会在上文中的updateListOfServers()
函数中过滤一些servers
节点,
在ribbon
中默认实现就是ZonePreferenceServerListFilter
, 这个类的父类是ZoneAffinityServerListFilter
该过滤器基于区域感知的方式实现服务实例的过滤,也就是说,它会根据服务的实例所处的Zone
和消费者自身的所处Zone
进行比较,过滤掉这些不是同处一个区域的实例。
首先过滤出消费者和服务的实例处于同一个zone
的server
节点,但不是不会马上过滤的结果, 而是通过shouldEnableZoneAffinity(filteredServers)
函数来判断是否要启用区域感知
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
使用LoadBalancerStats
的getZoneSnapShot
方法来获取这些过滤后的同区域实例的基础指标。比如实例数量,断路器断开数,活动请求数,实例平均负载等
根据这些指标和设置的阈值进行对比,如果有一个条件符合, 就不启动区域感知过滤的服务实例清单。
当集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障
blackOutServerPercentage
:故障实例百分比(断路器断开数/实例数量)>=0.8
activeRequestPerServer
:实例平均负载>=0.6
avaiableServers
:可用实例数(实例数-断路器断开数) < 2
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
ZonePreferenceServerListFilter
实现通过配置Zone
或者Eureka
实例元数据的Zone
来过滤出同区域的服务实例
ZoneAwareLoadBalancer
是对DynamicServerListLoadBalancer
的扩展
DynamicServerListLoadBalancer
是重用其父类的chooseServer
方法,
采用RoundRobinRule
规则,以线性轮询的方式来选择调用的服务实例, 它会把所有实例视为一个Zone
下的节点来看待,这样会周期性的产生跨区域Zone
访问
ZoneAwareLoadBalancer
重写了setServerListForZones(Map<String,List<Server>> zoneServersMap)
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
DynamicServerListLoadBalancer
中这个方法是根据Zone
划分实例列表,
交给LoadBalancerStats
中的zoneStatsMap
集合管理,每个Zone
对应一个ZoneStats
,用于存储每个Zone
节点的状态。
为每个Zone
分配一个BaseLoadBalancer
,每个BaseLoadBalancer
维护各自Zone
的服务实例列表
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
第二个循环对Zone中服务实例列表进行检查
再来看看ZoneAwareLoadBalancer
选择实例的逻辑
- 1.如果当前维护的
Zone
个数小于1
,默认走父类DynamicServerListLoadBalancer的chooseServer
实现 - 2.从
LoadBalancerStats
拿出所有Zone
快照信息 - 3.获取所有可用的
Zone
(会过滤一些不符合规则的Zone
,从实例数,负载,实例故障率维度去考量) - 4.随机选择一个
Zone
- 5.获取当前
Zone
的负载均衡器, 根据IRule
选择具体的服务实例
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
IRule
- 1.RandomRule 随机
- 2.RoundRobinRule 轮询
- 3.RetryRule 带重试的轮询
- 4.WeightedResponseTimeRule 带权重的轮询.该策略是对RoundRobinRule的扩展, 根据实例的运行情况来计算权重,并且根据权重来挑选实例达到更优的分配效果。
WeightedResponseTimeRule
需要注入ILoadBalancer
属性。在初始化的时候 会创建定时任务, 每30s执行一次 计算每个服务实例的权重。
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
比如有4个实例A,B,C,D,它们平均响应时间为10,40,80,100. 总响应时间为10+40+80+100=230。所以A的权重是230-10=220 【0,220】,B的权重是220+230-40=410 (220,410] ,C的权重是410+230-80=560 (410,560] ,D的权重是560+230-100=690 (560,690)
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
实例的选择, 生成一个[0, 最大权重值)区间内的随机数。遍历权重列表,找到匹配的Server节点。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
- 5.
ClientConfigEnabledRoundRobin
: 内部的实现还是轮询 - 6.
BestAvailableRule
: 注入了LoadBalancerStats
,遍历所有服务实例, 过滤故障的实例,选择最空闲的实例。如果LoadBalancerStats
为空的话,采用父类ClientConfigEanbledRoundRobin
实现 - 7.
PredicateBasedRule
:先过滤,后轮询
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
/**
* Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
*
*/
public abstract AbstractServerPredicate getPredicate();
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
AbstractServerPredicate
中的实现
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
- 8.
AvailabilityFilterRule
继承上文的PredicateBasedRule
, 先轮询一个Server
,看这个Server
是否满足判断条件。如果满足直接返回,
如果不满足一直循环至11次。 11次都没有选出合适的Server, 降级策略, 调用父类PredicateBasedRule
的默认实现
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
这是一个组合Predicate
,在上文中的predicate.apply(new PredicateKey(server)
实际上会调用CompositePredicate
中的AvailabilityPredicate
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
可见AvailabilityPredicate
是根据当前Server
的状态来过滤的。
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
上文中的降级策略最终会走到PredicateBaseRule
, 这个规则中的predicate
是其子类AvailabilityFilteringRule
中的CompositePredicate
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
CompositePredicate
的父类是AbstractServerPredicate
,最终又回到AbstractServerPredicate
的chooseRoundRobinAfterFiltering
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
但是其中的getEligibleServers(servers, loadBalancerKey)
方法又被子类CompositePredicate
重写,一切仿佛又回到了原点, 这里就是上文所说的降级策略, 因为fallbackPredicate
默认实现为true,所以这里的逻辑是先走一遍AvailabilityPredicate
, 如果所有可用的server
列表, 如果还不满足的话, 就直接轮询了。
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
- 9.
ZoneAvoidanceRule
:还是遵循先过滤后轮询思想, 首先执行这2个ZoneAvoidancePredicate
,AvailabilityPredicate
逻辑,先找出合适的Zone
,再找Zone
下面健康的Server
实例。如果都没找到,执行降级策略。
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}
@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
private volatile DynamicDoubleProperty triggeringLoad = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.triggeringLoadPerServerThreshold", 0.2d);
private volatile DynamicDoubleProperty triggeringBlackoutPercentage = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.avoidZoneWithBlackoutPercetage", 0.99999d);
private static final Logger logger = LoggerFactory.getLogger(ZoneAvoidancePredicate.class);
private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory
.getInstance().getBooleanProperty(
"niws.loadbalancer.zoneAvoidanceRule.enabled", true);
public ZoneAvoidancePredicate(IRule rule, IClientConfig clientConfig) {
super(rule, clientConfig);
initDynamicProperties(clientConfig);
}
public ZoneAvoidancePredicate(LoadBalancerStats lbStats,
IClientConfig clientConfig) {
super(lbStats, clientConfig);
initDynamicProperties(clientConfig);
}
ZoneAvoidancePredicate(IRule rule) {
super(rule);
}
private void initDynamicProperties(IClientConfig clientConfig) {
if (clientConfig != null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".triggeringLoadPerServerThreshold", 0.2d);
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
}
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}
注意事项
如果配置只对某个服务的Ribbon客户端生效,则CustomRibbonConfiguration类不能包含在主应用程序上下文的@CompantScan中,需要添加了自定义注解。
使用自定义注解和excludeFilters使CustomRibbonConfiguration类不@CompantScan扫描到!
CustomRibbonConfiguration只会注册在该服务的Spring容器中!也就是上文中提到的SpringClientFactory!
尾言
我不是头脑空空,我不是一只米虫!
网友评论