Spring官网的Ribbon文档
GitHub关于Ribbon组件的描述
生产环境的Ribbon一般都会搭配Feign和Eureka来使用,但对于这种使用场景分析需要同时熟悉Ribbon对静态服务如何做负载均衡、Feign如何调用静态服务以及Eureka Client如何缓存Eureka Server的服务。Eureka在前面已经分析过了,本篇记录Ribbon如何对静态服务做负载均衡,待后面分析了Fiegn对静态服务的调用后,便可将Ribbon、Feign和Eureka融合在一起分析做完整地记录。
一 Maven依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
二 自动配置类
Ribbon的自动配置类是RibbonAutoConfiguration
,例举出两个重要的Bean
- SpringClientFactory
- RibbonClientHttpRequestFactory
- RestTemplateCustomizer
自动配置类中的Bean不多,是因为SpringClientFactory还内置了一个配置类RibbonClientConfiguration
,该配置类是以服务提供方为维度生效的,并不是以单例形式托管给当前调用方服务的容器,后面会详细说明,这里列举出其中比较重要的配置Bean
- IClientConfig
- IRule
- IPing
- ServerList
- ServerListUpdater
- ILoadBalancer
- RibbonLoadBalancerContext
- RetryHandler
- RestClient
三 Bean释义
自动配置类RibbonAutoConfiguration
中的Bean
- SpringClientFactory
托管给容器的一个Bean,内部还声明了另一个重要的配置类。主要用于从服务提供方容器中获取各个组件
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
- RibbonClientHttpRequestFactory
内部类RibbonClientHttpRequestFactoryConfiguration中配置的Bean,一个工厂类用来生产客户端的HttpRequest
@Bean
public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
return new RibbonClientHttpRequestFactory(this.springClientFactory);
}
- RestTemplateCustomizer
自定义的匿名RestTemplateCustomizer,并且将Spring内置的RestTemplate的RequestFactory设置为RibbonClientHttpRequestFactory
@Bean
public RestTemplateCustomizer restTemplateCustomizer(
final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
return restTemplate -> restTemplate
.setRequestFactory(ribbonClientHttpRequestFactory);
}
// lambda表达式可能不太好看,转化为传统匿名类的形式方便理解
@Bean
public RestTemplateCustomizer restTemplateCustomizer(
final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
}
};
}
内置配置类RibbonClientConfiguration
的Bean,这些Bean作为默认选项都有用@ConditionalOnMissingBean修饰,支持自定义替换
- IClientConfig
当前服务作为调用方的配置信息,loadProperties将客户端的所有配置的以及默认的配置信息都保存在了DefaultClientConfigImpl内的properties属性上
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
- IRule
当前服务调用其他服务时载均衡的规则。比如随机、轮询、权重及最少并发等,这里配置的是根据区域和可用性过滤规则
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
- IPing
类似于心跳,用来判断被服务提供方的存活情况
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
- ServerList
服务提供方的集合
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
- ServerListUpdater
动态更新服务提供方的节点信息
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
- ILoadBalancer
真正的负载均衡器
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
- RibbonLoadBalancerContext
封装了负载均衡器的上下文
@Bean
@ConditionalOnMissingBean
public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
IClientConfig config, RetryHandler retryHandler) {
return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
}
- RetryHandler
重试策略,默认的只有简单的异常处理,真正生产环境使用都需要根据业务场景自定义重试策略
@Bean
@ConditionalOnMissingBean
public RetryHandler retryHandler(IClientConfig config) {
return new DefaultLoadBalancerRetryHandler(config);
}
- RestClient
RestClient并不是配置类中的Bean,而是由于配置类中导入了另外的配置类RestClientRibbonConfiguration
,它才会被托管给容器,默认的实现是RibbonClientConfiguration的内部类OverrideRestClient
@Bean
@Lazy
@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
public RestClient ribbonRestClient(IClientConfig config, ILoadBalancer loadBalancer,
ServerIntrospector serverIntrospector, RetryHandler retryHandler) {
RestClient client = new RibbonClientConfiguration.OverrideRestClient(config,
serverIntrospector);
client.setLoadBalancer(loadBalancer);
client.setRetryHandler(retryHandler);
return client;
}
Ribbon作为客户端负载均衡,必然要自己缓存服务端的多个节点信息(同一服务的不同节点),定时去探测节点是否存活,在每次发起调用时,根据配置的负载均衡规则,完成对不同节点的调用。在调用异常后,根据异常信息决定是否要重试
四 Ribbon启动
- 客户端服务启动后,Bean
SpringClientFactory
被实例化 - 在其构造器中显式的将内置配置类RibbonClientConfiguration设置到父类NamedContextFactory中,保存在属性defaultConfigType上
- RibbonAutoConfiguration中的其他Bean也将被创建托管给容器
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
// 类SpringClientFactory
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
// 父类NamedContextFactory
public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
String propertyName) {
this.defaultConfigType = defaultConfigType;
this.propertySourceName = propertySourceName;
this.propertyName = propertyName;
}
五 创建服务提供方IOC容器
由于是对静态服务做负载均衡,服务的节点信息直接配置在配置文件中。官网7.6里有说明,这里贴一下格式
stores:
ribbon:
listOfServers: example.com,google.com
用配置的静态服务名替换ip端口后(例如:http://example.com/getUser?id=1
替换后为http://stores/getUser?id=1
),使用RestTemplate对替换后的url多次发起调用,会看到负载均衡有生效(调用方式:restTemplate.getForObject(url, Stores.class)
)
- 使用RestTemplate的getForObject方法发起调用,调用链依次为RestTemplate的 getForObject() > execute() > doExecute()
- createRequest创建了客户端请求的Request
- execute完成了对配置的静态服务的调用
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
在创建客户端请求时,需要从clientFactory中获取部分配置组件,封装为客户端请求RibbonHttpRequest返回
- createRequest是父类HttpAccessor中的方法
- getRequestFactory返回的是RibbonClientHttpRequestFactory,所以createRequest来到了RibbonClientHttpRequestFactory中(这是在上面实例化Bean
RestTemplateCustomizer
时设置过的) -
this.clientFactory.getClientConfig(serviceId)
中的serviceId是服务提供方的服务名(例如示例中的stores),clientFactory是当前服务启动时托管给容器的BeanSpringClientFactory
,正是在这里完成了以服务提供方为维度的子IOC容器的创建 - 封装RibbonHttpRequest返回
// HttpAccessor
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
// RibbonClientHttpRequestFactory
public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod)
throws IOException {
String serviceId = originalUri.getHost();
if (serviceId == null) {
throw new IOException(
"Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
}
IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
}
SpringClientFactory中的getClientConfig方法获取配置时,从SpringClientFactory类调用到父类NamedContextFactory,获取配置实际是从name对应的服务提供方子容器中获取,对于不存在的子容器,会尝试创建并缓存在调用方服务内部。获取配置的调用链依次为: getClientConfig() > getInstance() > 父类getInstance()
- name是服务提供方的服务名,type是要获取的实例类型。父类getInstance中获取实例时,会先获取服务提供方的上下文
- 当前服务作为调用方,如果没有缓存该服务提供方的上下文,会尝试创建服务提供方的上下文并随之缓存起来
- 这些以服务提供方为维度的上下文,也就是子IOC容器,类型均为AnnotationConfigApplicationContext
- 接着将上述SpringClientFactory往NamedContextFactory中设置的内置配置类
RibbonClientConfiguration
注册到子IOC容器中 - 设置当前调用方容器作为父容器后,调用子容器的refresh方法,以解析该配置类,进而配置类中的Bean会托管给对应的子容器
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
type).length > 0) {
return context.getBean(type);
}
return null;
}
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
六 Ribbon Ping机制
子容器refresh时,会将内置配置类的Bean全部实例化,其中包括ILoadBalancer,实现类为ZoneAwareLoadBalancer,调用链依次为:ZoneAwareLoadBalancer构造器 > 父类DynamicServerListLoadBalancer构造器 > 父类BaseLoadBalancer构造器 > initWithConfig()
- setRule设置了负载均衡规则,如果未配置默认为RoundRobinRule
- setPing设置了Ping规则,当前默认配置为DummyPing,意味着跳过开启Ping功能
- 如果需要开启Ping功能,BaseLoadBalancer中内置的Timer会开启任务,默认每10S执行一次Ping
- 具体的Ping功能在内部类SerialPingStrategy的方法pingServers中执行,但最终还是调用配置的Bean
IPing
的isAlive方法 - 举例看下实现类PingUrl中的isAlive方法,可以看到是用Apache的Http发送了Get请求,判断响应状态码是否为200,以最终确定服务是否存活
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// BaseLoadBalancer类
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
}
init();
}
public void setPing(IPing ping) {
if (ping != null) {
if (!ping.equals(this.ping)) {
this.ping = ping;
setupPingTask(); // since ping data changed
}
} else {
this.ping = null;
// cancel the timer task
lbTimer.cancel();
}
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
private boolean canSkipPing() {
if (ping == null
|| ping.getClass().getName().equals(DummyPing.class.getName())) {
// default ping, no need to set up timer
return true;
} else {
return false;
}
}
// PingUrl类
public boolean isAlive(Server server) {
String urlStr = "";
if (isSecure){
urlStr = "https://";
}else{
urlStr = "http://";
}
urlStr += server.getId();
urlStr += getPingAppendString();
boolean isAlive = false;
HttpClient httpClient = new DefaultHttpClient();
HttpUriRequest getRequest = new HttpGet(urlStr);
String content=null;
try {
HttpResponse response = httpClient.execute(getRequest);
content = EntityUtils.toString(response.getEntity());
isAlive = (response.getStatusLine().getStatusCode() == 200);
if (getExpectedContent()!=null){
LOGGER.debug("content:" + content);
if (content == null){
isAlive = false;
}else{
if (content.equals(getExpectedContent())){
isAlive = true;
}else{
isAlive = false;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
// Release the connection.
getRequest.abort();
}
return isAlive;
}
七 Ribbon缓存服务
父类BaseLoadBalancer的构造器执行完成后,回到父类DynamicServerListLoadBalancer的构造器中,restOfInit()内完成了对配置的静态服务的缓存以及开启周期性更新服务的任务
- restOfInit中enableAndInitLearnNewServersFeature方法,使用配置类中默认的ServerListUpdater执行默认的任务
- 默认的Updater是PollingServerListUpdater,默认任务是ServerListUpdater.UpdateAction
- UpdateAction的doUpdate方法,最终调用的是DynamicServerListLoadBalancer的方法updateListOfServers
- updateListOfServers中用配置的Bean
ServerList
来获取服务提供方,这里是配置类中的静态服务配置类ConfigurationBasedServerList - 静态配置类会从配置文件中解析出服务列表,最后在updateAllServerList尝试更新
- 最终的服务是缓存在父类BaseLoadBalancer中,保存在属性allServerList上
// DynamicServerListLoadBalancer类
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
// PollingServerListUpdater类
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
// ConfigurationBasedServerList类
@Override
public List<Server> getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
protected List<Server> derive(String value) {
List<Server> list = Lists.newArrayList();
if (!Strings.isNullOrEmpty(value)) {
for (String s: value.split(",")) {
list.add(new Server(s.trim()));
}
}
return list;
}
八 Ribbon负载均衡
服务提供方的子容器初始化完成后,一系列Bean也已经完成实例化并托管给容器,RestTemplate拿到需要的request后开始发起对服务方的请求(request为配置的BeanRibbonHttpRequest
)
- RestTemplate执行从父类AbstractClientHttpRequest继承的execute方法调用了自身的executeInternal方法
- executeInternal方法中执行Bean
RestClient
的executeWithLoadBalancer方法 - RestClient是配置类RibbonClientConfiguration中的内部类OverrideRestClient
- 执行的是父类AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法
- 使用buildLoadBalancerCommand构造LoadBalancerCommand后,最终执行的是LoadBalancerCommand的submit方法
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
try {
addHeaders(headers);
if (outputStream != null) {
outputStream.close();
builder.entity(outputStream.toByteArray());
}
HttpRequest request = builder.build();
HttpResponse response = client.executeWithLoadBalancer(request, config);
return new RibbonHttpResponse(response);
}
catch (Exception e) {
throw new IOException(e);
}
}
// AbstractLoadBalancerAwareClient类
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
这里使用了观察者模式,直观的阅读代码不是很好理解。总体流程是:先从缓存的服务中,根据负载均衡规则IRule
选中了适合的服务提供方实例,Ribbon内置的RestClient使用jersey完成对选中实例的Http调用,对于失败的请求,会根据配置的重试策略以决定是否要重试
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
- selectServer中选出了合适服务提供方节点
- 这里的loadBalancerContext是子容器中配置的RibbonLoadBalancerContext,实际调用的是父类LoadBalancerContext的方法getServerFromLoadBalancer
- 在构造RibbonLoadBalancerContext上下文时,已经将ILoadBalancer设置到LoadBalancerContext中。这里可以拿到ILoadBalancer调用其chooseServer方法以选择服务
- chooseServer在ILoadBalancer的父类BaseLoadBalancer中,但最终还是调用了IRule的choose方法
- 这里以实现类RandomRule为例,可以看到起负载均衡策略是从所有的服务提供方实例中随机选择实例
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
// LoadBalancerContext类
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
// No Full URL - and we dont have a LoadBalancer registered to
// obtain a server
// if we have a vipAddress that came with the registration, we
// can use that else we
// bail out
if (vipAddresses != null && vipAddresses.contains(",")) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured."
+ " Also, there are multiple vipAddresses and hence no vip address can be chosen"
+ " to complete this partial uri");
} else if (vipAddresses != null) {
try {
Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
host = hostAndPort.first();
port = hostAndPort.second();
} catch (URISyntaxException e) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured. "
+ " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
throw new ClientException(
ClientException.ErrorType.GENERAL,
this.clientName
+ " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
+ " Also has no vipAddress registered");
}
}
} else {
// Full URL Case
// This could either be a vipAddress or a hostAndPort or a real DNS
// if vipAddress or hostAndPort, we just have to consult the loadbalancer
// but if it does not return a server, we should just proceed anyways
// and assume its a DNS
// For restClients registered using a vipAddress AND executing a request
// by passing in the full URL (including host and port), we should only
// consult lb IFF the URL passed is registered as vipAddress in Discovery
boolean shouldInterpretAsVip = false;
if (lb != null) {
shouldInterpretAsVip = isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null){
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
} else {
// just fall back as real DNS
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
}
} else {
// consult LB to obtain vipAddress backed instance given full URL
//Full URL execute request - where url!=vipAddress
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
// BaseLoadBalancer类
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
// RandomRule类
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
int index = chooseRandomInt(serverCount);
server = upList.get(index);
if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
九 Ribbon请求服务端
AbstractLoadBalancerAwareClient
根据负载均衡策略拿到服务提供方信息后,Ribbon准备开始向服务端发起请求,该请求是由Jersey实现。
- 拿到服务端信息后,在AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)中发起对服务端的请求
- this是配置的Bean
RestClient
OverrideRestClient,execute方法继承自父类RestClient - 父类RestClient的execute方法中,使用了Jersey来完成对服务端资源的访问
@Override
public HttpResponse execute(HttpRequest task, IClientConfig requestConfig) throws Exception {
IClientConfig config = (requestConfig == null) ? task.getOverrideConfig() : requestConfig;
return execute(task.getVerb(), task.getUri(),
task.getHeaders(), task.getQueryParams(), config, task.getEntity());
}
private HttpResponse execute(HttpRequest.Verb verb, URI uri,
Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
HttpClientResponse thisResponse = null;
boolean bbFollowRedirects = bFollowRedirects;
// read overriden props
if (overriddenClientConfig != null
// set whether we should auto follow redirects
&& overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects)!=null){
// use default directive from overall config
Boolean followRedirects = Boolean.valueOf(""+overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects, bFollowRedirects));
bbFollowRedirects = followRedirects.booleanValue();
}
restClient.setFollowRedirects(bbFollowRedirects);
if (logger.isDebugEnabled()) {
logger.debug("RestClient sending new Request(" + verb
+ ": ) " + uri);
}
WebResource xResource = restClient.resource(uri.toString());
if (params != null) {
for (Map.Entry<String, Collection<String>> entry: params.entrySet()) {
String name = entry.getKey();
for (String value: entry.getValue()) {
xResource = xResource.queryParam(name, value);
}
}
}
ClientResponse jerseyResponse;
Builder b = xResource.getRequestBuilder();
if (headers != null) {
for (Map.Entry<String, Collection<String>> entry: headers.entrySet()) {
String name = entry.getKey();
for (String value: entry.getValue()) {
b = b.header(name, value);
}
}
}
Object entity = requestEntity;
switch (verb) {
case GET:
jerseyResponse = b.get(ClientResponse.class);
break;
case POST:
jerseyResponse = b.post(ClientResponse.class, entity);
break;
case PUT:
jerseyResponse = b.put(ClientResponse.class, entity);
break;
case DELETE:
jerseyResponse = b.delete(ClientResponse.class);
break;
case HEAD:
jerseyResponse = b.head();
break;
case OPTIONS:
jerseyResponse = b.options(ClientResponse.class);
break;
default:
throw new ClientException(
ClientException.ErrorType.GENERAL,
"You have to one of the REST verbs such as GET, POST etc.");
}
thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
if (thisResponse.getStatus() == 503){
thisResponse.close();
throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
}
return thisResponse;
}
十 Ribbon重试机制
重试机制包括:在同一实例上重试的次数、不同实例重试的次数、是否重试以及在何种条件下重试。当配置了Ribbon的重试机制后,Ribbon在对实例发起请求失败后,会根据重试策略以决定是在同一实例上重试还是根据负载策略换实例重试,不会超过限制的重试次数。重试策略的执行在上述LoadBalancerCommand的retryPolicy方法内
- 在retryPolicy方法内,调用了retryHandler的isRetriableException方法以决定是否要重试
- 默认的RequestSpecificRetryHandler中,GET请求都会重试,否则只有在连接超时的情况下才会重试
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
return new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer tryCount, Throwable e) {
if (e instanceof AbortExecutionException) {
return false;
}
if (tryCount > maxRetrys) {
return false;
}
if (e.getCause() != null && e instanceof RuntimeException) {
e = e.getCause();
}
return retryHandler.isRetriableException(e, same);
}
};
}
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
if (okToRetryOnAllErrors) {
return true;
}
else if (e instanceof ClientException) {
ClientException ce = (ClientException) e;
if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
return !sameServer;
} else {
return false;
}
}
else {
return okToRetryOnConnectErrors && isConnectionException(e);
}
}
网友评论