Spring Cloud Ribbon 源码分析
前言
原理介绍
ribbon提供了http请求负载均衡的能力,既然要扩展调度能力,就需要在请求之前,通过某种调度策略选择合适的server来调用。
想要实现这个效果需要以下几步:
- 对http的请求已经拦截,对其进行扩展
- 通过调度策略选择合适的server
- 改写http的ip和port来定向调用
源码分析
1. http请求拦截
首先介绍ribbon是如何进行给请求增加拦截器,需要用到LoadBalancerAutoConfiguration
LoadBalancerAutoConfiguration
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
LoadBalancerAutoConfiguration的restTemplates通过@LoadBalanced注解,将所有带有此注解的RestTemplate的对象都注入到list中。@LoadBalanced内部其实是继承@Qualifier实现。
通过RestTemplateCustomizer来对每个restTemplate实现扩展,将LoadBalancerInterceptor也加入到拦截器列表中
在整个restTemplate的调用过程中,会从getForObject -> execute -> doExecute -> request.execute 中来对拦截器进行处理。
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
// ...
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
// ...
}
finally {
if (response != null) {
response.close();
}
}
}
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
一直调用到InterceptingRequestExecution的execute方法中,这里先判断当前是否有拦截器,如果存在则调用intercept,我们这里通过LoadBalancerAutoConfiguration注入了LoadBalancerInterceptor,具体看下LoadBalancerInterceptor的实现。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
这里的loadBalancer是在创建Interceptor时,注入的RibbonLoadBalancerClient,自动注入的过程在RibbonAutoConfiguration中
RibbonAutoConfiguration
public class RibbonAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
}
至此完成了前期的请求调用拦截的部分,开始进入ribbon调度策略的选择环节。
2. 调度策略
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
首先实例化ILoadBalancer的bean,默认会注入ZoneAwareLoadBalancer的bean,ZoneAwareLoadBalancer的继承关系如下图。
ILoadBalancer的注入实现在RibbonClientConfiguration中。
RibbonClientConfiguration
// client的请求的默认配置
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
// 调度规则
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
// 维持心跳ping
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
// 服务端提供的server列表
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
// server列表动态更新
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
// loadbalance的客户端对象
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// server列表选择过滤器
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
return this.propertiesFactory.get(ServerListFilter.class, config, name);
}
ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
filter.initWithNiwsConfig(config);
return filter;
}
// loadbalancer的上下文配置
@Bean
@ConditionalOnMissingBean
public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
IClientConfig config, RetryHandler retryHandler) {
return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
}
// 重试处理器
@Bean
@ConditionalOnMissingBean
public RetryHandler retryHandler(IClientConfig config) {
return new DefaultLoadBalancerRetryHandler(config);
}
// server 检查器(内省),检查是否安全端口,获取基本信息等。
@Bean
@ConditionalOnMissingBean
public ServerIntrospector serverIntrospector() {
return new DefaultServerIntrospector();
}
ZoneAwareLoadBalancer
的实例化还依赖的其他的bean,如ZoneAvoidanceRule
、DummyPing
、ConfigurationBasedServerList
、DefaultLoadBalancerRetryHandler
等
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer 在构造方法中,会完成一系列的初始化动作,如serverList的设置、开启server刷新任务,初始化loadBalanceStat,配置负载均衡策略、ping策略、filter等操作。
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
restOfInit()
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
在restOfInit方法中,有两个关键方法:enableAndInitLearnNewServersFeature
和updateListOfServers
enableAndInitLearnNewServersFeature()
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
可以看到这个方法开启了一个schedule,这个schedule的用途是每隔一段时间刷新serverList。这里传入了一个UpdateAction,这个UpdateAction是
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
通过调用updateListOfServers来刷新serverList,而刷新的时间间隔refreshIntervalMs默认是30s。
updateListOfServers
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
这里的servers其实就是我们配置的listOfServers列表,在获取到最新的servers之后,会调用updateAllServerList
来更新缓存。
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
首先在super.setSersList中,将全局的server列表,也就是allServerList做更新,
在for循环中,调用getSingleServerStat
来刷新一下内部cache,最后设置zone。
这里需要说明一下LoadBalancerStats
LoadBalancerStats
public class LoadBalancerStats implements IClientConfigAware {
private static final String PREFIX = "LBStats_";
String name;
// Map<Server,ServerStats> serverStatsMap = new ConcurrentHashMap<Server,ServerStats>();
volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
private volatile CachedDynamicIntProperty connectionFailureThreshold;
private volatile CachedDynamicIntProperty circuitTrippedTimeoutFactor;
private volatile CachedDynamicIntProperty maxCircuitTrippedTimeout;
private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES =
DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30);
private final LoadingCache<Server, ServerStats> serverStatsCache =
CacheBuilder.newBuilder()
.expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<Server, ServerStats>() {
@Override
public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
notification.getValue().close();
}
})
.build(
new CacheLoader<Server, ServerStats>() {
public ServerStats load(Server server) {
return createServerStats(server);
}
});
ribbon通过这个对象来记录每个server的运行特征和统计数据,如各种连接时间,响应时间,熔断配置,请求次数,serverStat,zoneStat等。
LoadBalancerStats的这些参数,会作为负载均衡选择策略的重要参考依据。
至此,ZoneAwareLoadBalancer
的bean实例化完成。
回到调度选择的开始处,在完成loadBalance的instance之后,开始选择Server。
getServer
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
chooseServer
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
选择分两部分
- 如果没有配置zone,即只有一个zone,直接调用super.chooseServer。
- 如果有一个以上的zone, 则根据zone的算法选择一个server。
先看一下不使用zone算法的实现。
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
增加调用次数,并通过role的实现选择一个server。
这里的zone是ZoneAvoidanceRule
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
内部通过轮询的机制,来选择server,除了ZoneAvoidanceRule的实现之外,还有提供了很多的其他rule策略,如下图:
比如WeighedResponseTimeRule就是通过响应时间来动态选择server,具体算法的数据就是依赖LoadBalanceStat的数据统计。
3. uri改写和调用
继续回到调度的最开始,选择好server之后,就这些对象封装成RibbonServer,就继续执行execute方法。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
这个方法主要就是request.apply的调用。
request是在前面的调用中,this.requestFactory.createRequest(request, body, execution) 来构建。
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
};
}
可以看到这里通过定义了一个匿名内部类,定义了apply这个方法的实现。
封装了一个ServiceRequestWrapper,并提供了一个扩展口,可以自定义LoadBalancerRequestTransformer来改变原有的一些实现,如可以改变之前的server选择。
转化完毕后,继续执行execute。
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
这里重新创建了ClientHttpRequest,在getURI中,重构了uri的地址,将之前uri中的服务名,改成了选择的ip和端口,重新设置headers后,发起真正的request调用。
至此,一个完成的请求的ribbon负载过程执行完成。
网友评论