我们从源码来看:
服务启动之后,客户端的心跳包会从这个方法发送出去
image.png
在这里会进行发送心跳的逻辑:
initScheduledTasks();
image.png
然后就可以找到发送心跳的逻辑代码:
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
我们可以去看下TimedSupervisorTask这个类,构造函数里有很多时间的参数,比如超时时间还有间隔调用时间等,然后new HeartbeatThread() 就是主要的发送心跳的方法了。
点进去可以看到他是一个线程:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
然后我们再看下这个线程执行的类: renew():
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
到这里我们可以发现与之前的注册到注册中心的源码比较类似,然后我们就到同样的那个类AbstractJerseyEurekaHttpClient:
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
我们可以断点过去,然后具体详细的查看各个参数信息。
当客户端的心跳发送出去之后,服务端就会接收到心跳然后做相应的处理,这块的源码是在这个类InstanceResource:
@PUT
public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id);
return Response.status(Status.NOT_FOUND).build();
} else {
Response response = null;
if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) {
this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()});
return response;
}
}
我们可以看到这是一个标准的REST接口,其中isFromReplicaNode代表的是是否从别的注册中心发送过来的心跳包,因为我们这里测试的不是集群,是从服务提供者发过来的,所以值是false。
然后我们查看下面的逻辑
image.png
大意就是找到需要续约的实例,进行续约,最后再发布一个续约成功的事件。
网友评论