声明: 本文eureka客户端版本
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
启动流程
eureka客户端实现类: com.netflix.discovery.DiscoveryClient
eureka配置项实体类: org.springframework.cloud.netflix.eureka.EurekaClientConfigBean
1. 启动流程核心代码在DiscoveryClient构造函数中
2. 核心操作
2.1 是否强制在初始化阶段进行eureka注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
说明:
shouldRegisterWithEureka对应配置eureka.client.registerWithEureka,是否注册eureka如果为false则不启动整个DiscoveryClient,默认为true
shouldEnforceRegistrationAtInit对应配置eureka.client.shouldEnforceRegistrationAtInit,是否强制在初始化阶段进行进行eureka注册,默认false
综合俩个参数,eureka客户端在初始化阶段本意是不注册eureka,因为此时服务处于不可用状态!!
2.2 构造需要的定时任务(集群发现, 心跳, 实例副本...)
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
说明:
- 从eureka服务端拉取当前服务集群信息-task
- 心跳、实例信息副本-task
特殊:InstanceInfoReplicator 类实现Runnable包含注册逻辑,因此需要分析该类的触发
class InstanceInfoReplicator implements Runnable {
.......
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false);
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public void start(int initialDelayMs) {
//触发run方法
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
......
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
//触发run方法
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//触发注册接口
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
重点: start()、onDemandUpdate()方法包含对run方法的触发
start()方法在initScheduledTasks()方法被调用,代码如下:
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
说明: start()方法提交一个延迟任务,延迟时间getInitialInstanceInfoReplicationIntervalSeconds()对应配置eureka.client.initialInstanceInfoReplicationIntervalSeconds,默认值为40,因此该任务会在启动之后才被触发
onDemandUpdate()方法在initScheduledTasks()方法被调用,代码如下:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
说明: eureka客户端启动时会有状态变更STARTING->UP,因此如果注册statusChangeListener,则notify()方法会被触发从而触发注册逻辑, shouldOnDemandUpdateStatusChange()直译为按需状态变更,对应配置项为eureka.client.onDemandUpdateStatusChange,默认值为true
综合: 默认情况下,由statusChangeListener触发eureka的注册逻辑,因此如果在启动过程中不注册则需要修改eureka.client.onDemandUpdateStatusChange=false
问题: 在关闭上述配置后,什么时候进行注册逻辑
- 实例副本任务(InstanceInfoReplicator),在初始化过程中会提交任务触发注册逻辑,但是触发时间不好控制,因为在初始化阶段提交,因此需要正确估算启动间隔!!
- 心跳任务(HeartbeatThread),会检测是否不存在(404),如果不存在才会触发注册逻辑,但是eureka有STARTING中间状态,因此如果远端eureka服务记录的当前实例处于该状态,服务依旧被认为不可用,因此该任务不可靠!!
自定义: 在tomcat启动之后触发注册逻辑
- 服务注册逻辑复用,org.springframework.cloud.netflix.eureka.http.RestTemplateEurekaHttpClient包含了所有eureka操作的restful接口
- spring boot启动的run方法中包含扩展点: org.springframework.boot.SpringApplicationRunListener,该类中定义了不同阶段会被调用的方法!!
综合上述细节产出如下扩展类
@Slf4j
public class EurekaRegisterListener implements SpringApplicationRunListener, Ordered {
private final SpringApplication application;
private final String[] args;
//注意该构造方法为官方指定,必须包含这俩个参数
public EurekaRegisterListener(SpringApplication sa, String[] arg) {
this.application = sa;
this.args = arg;
}
@Override
public int getOrder() {
//该listener最后被触发执行
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void starting() {
}
@Override
public void environmentPrepared(ConfigurableEnvironment environment) {
}
@Override
public void contextPrepared(ConfigurableApplicationContext context) {
}
@Override
public void contextLoaded(ConfigurableApplicationContext context) {
}
@Override
public void started(ConfigurableApplicationContext context) {
}
//run方法在刚刚启动的时候会调用一次,然后整体服务启动后还会被调用一次
@Override
public void running(ConfigurableApplicationContext context) {
//获取eureka服务端配置项
String eurekaServiceUrls = context.getEnvironment().getProperty("eureka.client.service-url.defaultZone");
if (StringUtils.isEmpty(eurekaServiceUrls)) {
log.error("not found eureka service for manual register!!!!");
return;
}
//第一次调用时上下文并没有被构造因此找bean时失败,才是会抛异常,因此此处捕获并忽略!!
EurekaInstanceConfigBean eurekaInstanceConfigBean;
try {
eurekaInstanceConfigBean = context.getBean(EurekaInstanceConfigBean.class);
} catch (Exception ignore) {
return;
}
//eureka的配置项支持多个地址并用逗号隔开,因此此处也做了兼容
String[] serviceUrlArr = eurekaServiceUrls.split(",");
for (String serviceUrl : serviceUrlArr) {
//轮询地址,构造restTemplate
EurekaHttpClient eurekaHttpClient = new RestTemplateTransportClientFactory().newClient(new DefaultEndpoint(serviceUrl));
//获取eureka根据配置文件构造出的实例对象
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(eurekaInstanceConfigBean).get();
//此时直接将状态更该为UP,默认为STARTING虽然注册但是不可用
instanceInfo.setStatus(InstanceInfo.InstanceStatus.UP);
//发送rest请求去注册
EurekaHttpResponse<Void> register = eurekaHttpClient.register(instanceInfo);
//判断当前地址是成功注册
if (register.getStatusCode() == 204) {
log.info("success manual register eureka!!!");
return;
}
}
}
@Override
public void failed(ConfigurableApplicationContext context, Throwable exception) {
//启动失败时下线eureka实例,eureka内部实现直接拿过来用!
DiscoveryManager.getInstance().shutdownComponent();
}
}
说明: SpringApplicationRunListener使用spring的spi机制,因此需要在resource目录下增加META-INF目录,增加spring.factories文件,文件内容为: org.springframework.boot.SpringApplicationRunListener=
com.xxx.EurekaRegisterListener
总结
- 调整eureka.client.initialInstanceInfoReplicationIntervalSeconds时间,让服务启动后刚刚好到达任务执行时间
- 扩展SpringApplicationRunListener,将在服务启动后立即触发eureka的注册逻辑
网友评论