Eureka源码分析(2.1.4.Release)
首先源码切忌一行一行debug,需先了解eureka主要功能后,再分析其功能如何实现。
image.png
主要功能
- 服务注册:客户端向eureka服务端注册自己的服务,服务端将信息存于多级缓存中。
- 服务续约:服务注册后,客户端会定时发送心跳包来保持可用性,避免被剔除。每30秒(eureka.instance.leaseRenewallIntervalInSeconds)发送一次心跳来进行服务续约。
- 服务同步:eureka服务端之间互相进行注册,构建server集群,不同Server之间会进行服务同步,用来保证服务信息的一致性。
- 服务列表获取:客户端请求eureka server获取注册的服务清单,并缓存到客户端本地。默认间隔30秒。(eureka.client.registryFetchIntervalSeconds)
fetchRegistry:true 当值为true时,会定期从eureka服务端获取列表,集群中的eureka服务端也会从其他的eureka节点中获取列表)
- 远程调用:服务调用方在获取到服务清单后,就可以根据自身的一个负载均衡策略找到一个服务url,并对他发起请求。
- 服务下线:客户端关闭或重启时,需告诉server端该服务已下线。服务端在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去
- 服务剔除:如果客户端没有正常下线,需要有剔除机制,将不可用的服务删除掉。EurekaServer 在启动时会创建一个定时任务,每隔默认60秒,从当前服务清单中把超时没有续约(默认90秒,eureka.instance.leaseExpirationDurationInSeconds)的服务剔除
- 自我保护:当短时间内,统计续约失败的比例,如果达到一定阈值,就会触发自我保护机制。在该机制下,Eureka Server不会剔除任何微服务。等到一分钟内最近续约个数大于一分钟内最小续约,再退出自我保护机制。自我保护开关(eureka.server.enable-self-preservation:false)
Eureka保证AP
什么是AP?
著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。
由于分区容错性在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。Eureka选择了保证AP。
- 如一个客户端因网络原因在一个eureka服务端无法获取服务列表时,可以从其他eureka服务端获取数据,即使与之前的eureka服务列表不一致;
- eureka服务端有同步机制,保证了服务列表的最终一致性(非强一致性);
客户端初始化
这里简要分析一下
springboot会读取spring-cloud-netflix-eureka-client/2.1.1.RELEASE/spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar!/META-INF/spring.factories文件.(详情请看上一篇springboot自动装载)
自动初始化EnableAutoConfiguration下的所有配置类,内容如下:
image.png
重点看EurekaClientAutoConfiguration类
//初始化eureka客户端
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
image.png
CloudEurekaClient继承了DiscoveryClient类,该类为原生eureka类,所以springcloud的eureka客户端实际是封装了原生eureka客户端,分析发现,eureka主要功能都在原生类上,所以直接看原生eureka类的初始化代码就行了。
简要初始化流程如下:
image.png
具体初始化代码:com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
代码片段:
……忽略上部分代码……
//实例化一个线程池(核心线程:2个,分别对应心跳续约任务和获取服务列表任务)
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//声明心跳续约执行器(续约)
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
//声明更新服务列表缓存执行器(获取任务列表)
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
……忽略部分代码……
//重点看这里:初始化定时任务
initScheduledTasks();
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
private void initScheduledTasks() {
//默认为true,只有eureka服务端非集群模式时,该设置可以为false
if (clientConfig.shouldFetchRegistry()) {
//默认eureka.client.registryFetchIntervalSeconds=30,延时30秒执行
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//默认eureka.client.cacheRefreshExecutorExponentialBackOffBound=10,如果发生超时的重试延迟的最大乘数。
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//其中scheduler和cacheRefreshExecutor为之前提到的声明对象,其实就是初始化了一个cacheRefresh名字的线程执行任务
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
……忽略下半部分代码(与服务注册任务相关)……
在这里可以看到scheduler.schedule方法就是在定义一个延时任务,30秒后执行。但是按理说应该定义一个定时任务定时获取服务列表才对。为什么要这样定义呢?下面重点看看TimedSupervisorTask
初始化实际赋值:
//上文提到的线程池
this.scheduler = scheduler;
//上文提到的缓存更新执行器
this.executor = cacheRefreshExecutor;
//超时时间:上文提到的默认30秒
this.timeoutMillis = timeUnit.toMillis(registryFetchIntervalSeconds);
//执行的真正任务
this.task = new CacheRefreshThread();
//延迟时间:上文提到的默认30秒
this.delay = new AtomicLong(timeoutMillis);
//最大延迟时间:30秒*10=300秒,有什么用呢?看下面解释
this.maxDelay = timeoutMillis * expBackOffBound
这里说一下TimedSupervisorTask的执行逻辑,非固定的执行时间,而是采用动态间隔时间,去执行获取服务列表任务。(在平常工作中,也可以效仿此方法实现动态设置任务)
具体实现流程图如下:
image.png
具体实现代码:
com.netflix.discovery.TimedSupervisorTask#run:
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//执行任务在默认超时时间内。
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
//如果任务没超时,则把延时时间设置为默认的超时时间30秒
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
//如果超时
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//设置最新延时时间:将当前超时时间*2,与最大延时比较,即超时时间不能超过300秒
long newDelay = Math.min(maxDelay, currentDelay * 2);
//CAS来控制多线程同步
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
//又调用一个延时任务,延时时间为新设置的延时时间
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
服务列表获取
下面再来看看CacheRefreshThread的实现逻辑
com.netflix.discovery.DiscoveryClient#refreshRegistry->
com.netflix.discovery.DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//获取本地缓存:localRegionApps.get()
Applications applications = getApplications();
//判断是全量更新还是增量更新
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
//全量获取
getAndStoreFullRegistry();
} else {
//增量获取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
……忽略下部分代码……
}
全量获取:
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry->
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications->
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal
image.png
增量获取:
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta:
image.png
eueka不是每次都是采用全量获取服务列表,而是默认采用增量方式,获取变化的实例。
当然为保证获取到的服务列表是最新的,获取增量的同时,eureka服务端也会把最新的服务列表hashcode返回,客户端将此hashcode与更新后的本地缓存hashcode作对比,如果不相同,则重新进行全量获取。
可以学习的地方:
- TimeSupervisorTask是一个动态时间间隔任务,如果遇到执行任务超时,则扩大为原来间隔时间的两倍,一直到延时时间的阈值。一旦新任务不再超时,间隔时间又恢复为初始值。如服务列表获取任务默认为每隔30秒执行,随着执行时间超时扩大为原来的2倍再执行,直到到达阈值。如果执行未超时,则设置为原来的间隔时间执行。
- 采用CAS来控制多线程同步。
- 增量获取数据列表。尽量避免全量获取(当数据量很大的时候)
- 服务列表保存在本地缓存,每次远程调用,不需要经过注册中心,提高执行速度。当eureka服务端挂了的时候,eureka客户端还能通过本地缓存的服务列表正常调用远程服务。
网友评论