应用服务实例下线
1、Eureka Client发起下线
应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:
- 配置eureka.registration.enabled=true,应用实例开启注册开关。默认为false
- 配置eureka.shouldUnregisterOnShutdown=true,应用实例开启关闭时下线开关。默认为true
Eureka Client端有一个shutdown方法,服务实例下线的时候,会主动调用这个方法。
@Singleton
public class DiscoveryClient implements EurekaClient {
/**
* 关闭Eureka客户端。还将注销请求发送到Eureka服务器
*/
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
//将定时调度任务都停止
cancelScheduledTasks();
// If APPINFO was registered
//如果注册到了Eureka Server,通知Server下线
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//服务下线
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
// 调用AbstractJerseyEurekaHttpClient的cancel方法
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
}
将定时调度任务都停止——cancelScheduledTasks()
cancelScheduledTasks方法中会将client端启动的定时任务都停止,包括:心跳续约,定时拉取增量注册表等等。
@Singleton
public class DiscoveryClient implements EurekaClient {
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
if (cacheRefreshTask != null) {
cacheRefreshTask.cancel();
}
if (heartbeatTask != null) {
heartbeatTask.cancel();
}
}
}
AbstractJerseyEurekaHttpClient的cancel()方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口,实现应用实例信息的下线
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
}
2、Eureka Server接收下线
Eureka Server接收下线请求核心流程如下图:
1)、接收下线请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {
private final PeerAwareInstanceRegistry registry;
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
// 服务下线
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
}
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
// 调用父类方法下线服务实例
if (super.cancel(appName, id, isReplication)) {
// Eureka Server复制
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
return true;
}
return false;
}
}
PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)方法下线应用实例信息。
2)、下线应用实例信息
调用了AbstractInstanceRegistry的cancel(...)方法,下线应用实例信息,代码如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
// 获得读锁
read.lock();
// 增加取消注册次数到监控
CANCEL.increment(isReplication);
// 获取appName 对应的所有实例集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
// 移除租约映射
if (gMap != null) {
// 根据实例id 移除对应的实例租约信息
leaseToCancel = gMap.remove(id);
}
// 添加到最近取消注册的队列,用于统计
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
// 移除应用实例覆盖状态映射
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
// 租约不存在
if (leaseToCancel == null) {
// 添加取消注册不存在到监控
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 设置租约的取消注册时间戳
leaseToCancel.cancel();
// 添加到最近租约变更记录队列
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
//放入最近改变队列中
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 设置响应缓存过期
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
// 释放锁
read.unlock();
}
// 更新 需要发送心跳的客户端数量
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
}
更新需要发送续约信息客户端数量,毕竟是服务下线,就少了一个需要发送续约信息的客户端,更新自我保护机制触发阈值,这块涉及到Eureka Server 自我保护机制实现原理。
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
private long evictionTimestamp;
public void cancel() {
if (evictionTimestamp <= 0) {
// 设置取消注册时间戳
evictionTimestamp = System.currentTimeMillis();
}
}
}
方法逻辑不复杂,有四步:
- 1、从原始注册表数据结构中,删除下线的实例
- 2、对实例执行下线的逻辑,其实没什么逻辑,就是保存了一下下线的时间戳evictionTimestamp
- 3、放入最近变更队列中,以便client拉取增量注册表的时候可以感知到有服务实例下线了
- 4、清空读写缓存,避免client拉取缓存时拉取到已经下线的服务实例
服务下线不会主动通知其他客户端,而是等其他客户端来拉取增量注册表的时候,才会感知到。
参考:
https://blog.csdn.net/qq_40378034/article/details/119079180
网友评论