1 eureka 了解
-
服务注册
- 服务提供者本质是 eureka客服端,启动时候 调用 eureka 所提供的服务注册相关方法,向eureka服务器注册自己信息。
- eureka服务器 更新自身状态,然后同步到其他Eureka服务器节点
- eureka 服务器会维护一个已注册的服务列表。
- 服务提供者本质是 eureka客服端,启动时候 调用 eureka 所提供的服务注册相关方法,向eureka服务器注册自己信息。
-
服务续约
- 提供者注册到eureka服务器后,Eureka客户端会默认以每隔30秒的频率向Eureka服务器发送一次心跳
- 配置 eureka.instance.lease-renewal-interval-in-seconds=30
- eureka服务器 更新自身状态,然后同步到其他Eureka服务器节点
- Eureka服务器,如果在默认的时间内(90秒),也就是连续3次没有收到客户端的心跳,则会将该服务实例从所维护的服务注册表中剔除
- 配置 eureka.instance.lease-expiration-duration-in-seconds=90
- 提供者注册到eureka服务器后,Eureka客户端会默认以每隔30秒的频率向Eureka服务器发送一次心跳
-
服务下线与剔除
- 当服务实例关闭时,服务实例会先向Eureka服务器发送服务下线请求。该服务实例信息将从Eureka服务器的实例注册表中删除
-
获取服务
- eureka客户端启动时会从eureka服务器获取注册信息,并将其缓存在本地。
- 该注册列表信息定期(默认为30秒)从Eureka服务器进行同步
- Eureka服务器缓存注册列表信息,并对注册表其中的每一个服务实例信息进行压缩。
- Eureka客户端和Eureka服务器可以使用JSON/XML格式进行通信,默认的情况下eureka客户端使用压缩JSOn格式来获取注册列表信息
- eureka客户端启动时会从eureka服务器获取注册信息,并将其缓存在本地。
-
Region、Zone
- Region和Zone(或者Availability Zone)均是AWS(Amazon Web Services)的概念
1.1 服务注册及相关原理
- 分布式系统领域有个CAP定理,它指出对一个分布式计算系统来说,不可能同时满足以下3点
- 一致性(Consistency) 同一个数据在集群中的所有节点,同一时刻是否都是同样的值
- 可用性(Availability) 集群中一部分节点故障后,集群整体是否还能处理客户端的请求
- 分区容忍性(Partition tolerance) 是否允许数据的分区
- 数据分区的意思是指是否允许集群中的节点之间无法通信。
- 不同场景
- 涉及数据存储的场景,数据一致性应该是首先被保证的,像Zookeeper采用的设计原则就是CP原则
- 对于微服务的治理而言,核心就是服务的注册和发现,不同节点保持的服务提供信息不一致,不会造成灾难性的后果。所以对于服务发现而言,可用性比数据一致性更加重要。因此 eureka 遵守AP原则
2 客服端
- 客户端请求顶层接口
public interface EurekaHttpClient {
// 注册实例
EurekaHttpResponse<Void> register(InstanceInfo info);
EurekaHttpResponse<Void> cancel(String appName, String id);
EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);
EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);
EurekaHttpResponse<Applications> getApplications(String... regions);
EurekaHttpResponse<Applications> getDelta(String... regions);
EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions);
EurekaHttpResponse<Application> getApplication(String appName);
EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
EurekaHttpResponse<InstanceInfo> getInstance(String id);
void shutdown();
}
- 实现类
-
image.png
-
2.1 服务注册
-
客户端 请求 服务端 的注册接口 获得租约
-
客户端请求
// AbstractJersey2EurekaHttpClient 类
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
Response response = null;
try {
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.accept(MediaType.APPLICATION_JSON)
.acceptEncoding("gzip")
.post(Entity.json(info));
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
-
服务端 处理
- 处理注册请求 将所有注册操作复制到对等eureka节点。
// ApplicationResource 类
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 校验参数是否合法
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 注册应用实例信息
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
- PeerAwareInstanceRegistryImpl#register 注册应用实例信息
// PeerAwareInstanceRegistryImpl 处理将所有操作复制到 AbstractInstanceRegistry到对等Eureka节点,以使其保持同步。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// // 租约过期时间 默认90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 客户端有配置过期时间 并且大于0 用客户端自己配置的
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//注册应用实例信息
super.register(info, leaseDuration, isReplication);
//Eureka-Server 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
/**
* 将所有eureka操作复制到对等eureka节点
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果已经是复制,则不要再次复制
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 不要复制自己
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//将所有实例更改复制到对等eureka节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* 将所有实例更改复制到对等eureka节点
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
- AbstractInstanceRegistry#register 注册应用实例信息
- PeerAwareInstanceRegistryImpl#register# super.register(info, leaseDuration, isReplication);
/**
* Registers a new instance with a given duration.
* 注册具有给定持续时间的新实例。
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 是新注册的实例 自我保护机制增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
//添加到 最近注册的调试队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 获得应用实例最终状态,并设置应用实例的状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 设置 租约的开始服务的时间戳(只有第一次有效)
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 设置 响应缓存 过期
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
2.2 服务发现
- 客户端 获取注册信息,分为 全量获取和增量获取
- 默认客户端 首先执行 全量获取,进行本地缓存注册
- 默认每30s增量 增量 刷新本地缓存。 当增量同步失败时回滚到全量同步
客户端
- 客户端启动时,首先执行一次全量获取进行本地缓存注册信息
- DiscoveryClient#fetchRegistry(false) 方法
- 定时任务 固定间隔默认30 发起获取注册信息,刷新本地注册信息缓存
- DiscoveryClient#refreshRegistry#fetchRegistry(boolean) 方法
// DiscoveryClient
// Applications 在本地的缓存
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
localRegionApps.set(new Applications());
// 从 Eureka-Server 拉取注册信息,
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup(); // 若初始拉取注册信息失败,从备份注册中心获取
}
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 最后,初始化调度任务(例如,集群解析器,心跳,instanceInfo复制器,获取
initScheduledTasks();
}
private void initScheduledTasks() {
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
- DiscoveryClient#fetchRegistry(boolean) 从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 )
/**
* Fetches the registry information.
* 获取注册表信息。
* <p>
* This method tries to get only deltas after the first fetch unless there
* is an issue in reconciling eureka server and client registry information.
* 除非在协调eureka服务器和客户端注册表信息时存在问题,否则此方法将尝试仅在首次获取后获取增量。
* </p>
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// 获取 本地缓存的注册的应用实例集合
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta() // 禁用增量获取
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 执行 全量获取
getAndStoreFullRegistry();
} else {
// 执行 增量获取
getAndUpdateDelta(applications);
}
// 设置 应用集合 hashcode
applications.setAppsHashCode(applications.getReconcileHashCode());
// 打印 本地缓存的注册的应用实例数量
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 在更新实例远程状态之前通知有关缓存刷新的信息
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 根据缓存中保存的刷新数据更新远程状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
全量获取
// DiscoveryClient
/**
* Gets the full registry information from the eureka server and stores it locally.
* When applying the full registry, the following flow is observed:
* 从eureka服务器获取完整的注册表信息,并将其存储在本地。 当应用完整注册表时
* if (update generation have not advanced (due to another thread))
* atomically set the registry to the new registry
* fi
*
* @return the full registry information.
* @throws Throwable
* on error.
*/
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
// 全量获取注册信息
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 设置到本地缓存
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
// AbstractJersey2EurekaHttpClient
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
Response response = null;
try {
WebTarget webTarget = jerseyClient.target(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
webTarget = webTarget.queryParam("regions", StringUtil.join(regions));
}
Builder requestBuilder = webTarget.request();
addExtraProperties(requestBuilder);
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get();
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.readEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), applications).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP GET {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
增量获取
// DiscoveryClient
/**
* Get the delta registry information from the eureka server and update it locally.
* 从eureka服务器获取增量注册表信息,并在本地进行更新。
*
* @return the client response
* @throws Throwable on error
*/
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
// 增量获取注册信息
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 增量同步失败,回滚到全量同步
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
// 增量获取为空,全量获取
getAndStoreFullRegistry();
// 增量同步,对比本地缓存和delta信息,更新本地缓存
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 将变化的应用集合和本地缓存的应用集合进行合并
updateDelta(delta);
// 计算本地的应用集合一致性哈希码
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
// 由于某些原因,实例数量有所不同
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
// AbstractJersey2EurekaHttpClient
public EurekaHttpResponse<Applications> getDelta(String... regions) {
return getApplicationsInternal("apps/delta", regions);
}
- 一致性hash码
- 计算公式 appsHashCode =
{count}_
- 如 12个up 12个down appsHashCode = DOWN_12_UP_12_
-
image.png
- 计算公式 appsHashCode =
// DiscoveryClient
private String getReconcileHashCode(Applications applications) {
// 计数集合 key:应用实例状态
TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
if (isFetchingRemoteRegionRegistries()) {
for (Applications remoteApp : remoteRegionVsApps.values()) {
remoteApp.populateInstanceCountMap(instanceCountMap);
}
}
applications.populateInstanceCountMap(instanceCountMap);
return Applications.getReconcileHashCode(instanceCountMap);
}
private String getReconcileHashCode(Applications applications) {
// 计数集合 key:应用实例状态
TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
if (isFetchingRemoteRegionRegistries()) {
for (Applications remoteApp : remoteRegionVsApps.values()) {
remoteApp.populateInstanceCountMap(instanceCountMap);
}
}
applications.populateInstanceCountMap(instanceCountMap);
return Applications.getReconcileHashCode(instanceCountMap);
}
// Applications
public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {
StringBuilder reconcileHashCode = new StringBuilder(75);
for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get())
.append(STATUS_DELIMITER);
}
return reconcileHashCode.toString();
}
服务端
- ApplicationsResource,处理增量 全量请求操作的 Resource ( Controller )。
全量获取
- 接收全量获取请求,映射 ApplicationsResource#getContainers() 方法
// ApplicationsResource
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
//判断是否可以访问
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
// API 版本
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 返回数据格式
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
增量获取
- 接收增量获取请求,映射 ApplicationsResource#getContainerDifferential() 方法
/**
* Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
* 获取有关所有增量变化的信息
*/
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
2.3 服务续约
- 客户端向服务端 注册应用实例成功后获租约
- 客户端 两个定时任务
- 一个 固定间隔 向 服务端发起续约,避免租约过期
- 一个 固定间隔 向 服务端发起 获取注册信息,刷新本地注册信息缓存。
- 默认情况下
- 租约有效期 90s, 续租频率 30s
- 两者比例3:1 保证网络异常等情况下,有三次充实计划
- 获取注册信息频率 30s
- 租约有效期 90s, 续租频率 30s
- 客户端 两个定时任务
客户端
- 客户端初始化过程中,创建心跳定时任务,固定间隔 向服务端发起续租
// DiscoveryClient
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 最后,初始化调度任务(例如,集群解析器,心跳,instanceInfo复制器,获取
initScheduledTasks();
}
/**
* Initializes all scheduled tasks.
* 初始化所有计划的任务。
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 注册表缓存刷新计时器
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 心跳 计时器
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
- 续租线程
// DiscoveryClient
/**
* 在给定时间间隔内更新租约的心跳任务。
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
/**
* Renew with the eureka service by making the appropriate REST call
* 通过进行适当的REST调用来使用eureka服务进行续订
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
// AbstractJerseyEurekaHttpClient
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
服务端
- 接受续租请求,并将续租操作复制到对等eureka节点
// InstanceResource
/**
* A put request for renewing lease from a client instance.
* 来自客户端实例的续订租约的放置请求。
*/
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 续租
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
// 在注册表中找不到,请立即注册
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response = null;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
// PeerAwareInstanceRegistryImpl
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 将所有eureka操作复制到对等eureka节点 跟注册一样
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
- AbstractInstanceRegistry#renew 续租应用实例信息
/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
* 将给定应用程序名称的给定实例标记为已更新,并且还标记它是否源自复制。
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
*/
public boolean renew(String appName, String id, boolean isReplication) {
/// 增加 续租次数 到 监控
RENEW.increment(isReplication);
// 获得 租约
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 租约不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 新增 续租每分钟次数 为了 自保保护机制,统计,界面显示。
renewsLastMin.increment();
// 续租 System.currentTimeMillis() + duration;
leaseToRenew.renew();
return true;
}
}
2.4 服务下线
- 应用实例关闭时,Eureka-Client 向 Eureka-Server 发起下线应用实例。 需要满足
- eureka.registration.enabled = true ,应用实例开启注册开关。默认为 false 。
- eureka.shouldUnregisterOnShutdown = true ,应用实例开启关闭时下线开关。默认为 true 。
客户端
- 关闭 客户端,并且请求发送到 服务端
// DiscoveryClient
/**
* Shuts down Eureka Client. Also sends a deregistration request to the
* eureka server.
* 关闭 客户端,并且请求发送到 服务端
*
*/
@PreDestroy // 关闭容器后释放一些资源
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered 如果APPINFO已注册
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled = true
&& clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown = true
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
/**
* unregister w/ the eureka service.
* 取消注册
*/
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
Response response = null;
try {
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete();
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
// AbstractJersey2EurekaHttpClient
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
Response response = null;
try {
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete();
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
服务端
- 服务端一方面接受下线请求,另一方面 定时任务 清理租约过期任务
- 1 InstanceResource 接受下线请求
- 2 AbstractInstanceRegistry.EvictionTask 定时任务 清理租约过期任务
-
1 InstanceResource 接受下线请求
// InstanceResource
/**
* Handles cancellation of leases for this particular instance.
* 处理此特定实例的租赁取消。
*/
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
// PeerAwareInstanceRegistryImpl
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
// 将所有eureka操作复制到对等eureka节点
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews
// 由于客户端想要取消它,因此减少发送续订的客户端数量
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
return false;
}
- AbstractInstanceRegistry.updateRenewsPerMinThreshold 可参考
// 更新期望每分钟收到客户端实例续约的阈值
protected void updateRenewsPerMinThreshold() {
//Renews threshold = 服务实例总数 *(60/续约间隔)*自我保护续约百分比阈值因子。
//Renews(last min) = 服务实例总数 * (60/续约间隔)
// numberOfRenewsPerMinThreshold Eureka Server 期望每分钟收到客户端实例续约的阈值。
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews //期望收到客户端续约的总数 (服务的总数)
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) // 获取客户端续约间隔(秒为单位)的方法。(默认30s)
* serverConfig.getRenewalPercentThreshold()); // 获取自我保护续约百分比阈值因子。(默认85%)
}
-
2 AbstractInstanceRegistry.EvictionTask 定时任务 清理租约过期任务
- AbstractInstanceRegistry.EvictionTask 清理租约过期任务
// AbstractInstanceRegistry
// 清理租约过期任务
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference<EvictionTask>();
//初始化触发
protected void postInit() {
renewsLastMin.start();
// 初始化 清理租约过期任务
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(), // 默认 60s
serverConfig.getEvictionIntervalTimerInMs()); // 默认60s
}
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
// 获得补偿时间毫秒数。计算公式 = 当前时间 - 最后任务执行时间 - 任务执行频率
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
- 处理过期 租约
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 是否 启用了租约到期 返回 false 进入保护机制
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 获得 所有过期的租约
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// 为了补偿GC的暂停或本地时间的漂移,我们需要使用当前注册表大小作为基础
// triggering self-preservation. Without that we would wipe out full registry.
// 触发自我保护。否则,我们将清除完整的注册表。
// 计算 最大允许清理租约数量
int registrySize = (int) getLocalRegistrySize();
// renewalPercentThreshold 自我保护阈值 默认 0.85
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 计算 清理租约数量
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
// 随机剔除过期节点
// 如果我们不这样做,我们可能会在自我保护开始之前就彻底清除整个应用程序。通过将其随机化,
// 影响应该均匀地分布在所有应用程序中。
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
-
3 保护机制
- 因为网络故障,短时间丢失很多 客户端,进入保护机制,不再 服务下线 。
- 触发
- 每分钟收到客户端实例续约最小值 <= 0 或者 最近一分钟收到续租数 <= 每分钟收到客户端实例续约最小值
- numberOfRenewsPerMinThreshold(期望最小每分钟续租次数) 计算
- expectedNumberOfClientsSendingRenews *(60.0/serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()
- 默认参数下
- expectedNumberOfClientsSendingRenews * 2 * 0.85
- 默认参数下
- expectedNumberOfClientsSendingRenews *(60.0/serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()
- expectedNumberOfClientsSendingRenews(预期发送续租的客户数量) 计算
- 注册,续租,下线 维护
- 触发
- 因为网络故障,短时间丢失很多 客户端,进入保护机制,不再 服务下线 。
// AbstractInstanceRegistry
// 期望最小每分钟续租次数
protected volatile int numberOfRenewsPerMinThreshold;
// 预期发送续租的客户数量
protected volatile int expectedNumberOfClientsSendingRenews;
//更新期望每分钟收到客户端实例续约的阈值
protected void updateRenewsPerMinThreshold() {
//Renews threshold = 服务实例总数 *(60/续约间隔)*自我保护续约百分比阈值因子。
//Renews(last min) = 服务实例总数 * (60/续约间隔)
// numberOfRenewsPerMinThreshold Eureka Server 期望每分钟收到客户端实例续约的阈值。
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews //期望收到客户端续约的总数 (服务的总数)
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) // 获取客户端续约间隔(秒为单位)的方法。(默认30s)
* serverConfig.getRenewalPercentThreshold()); // 获取自我保护续约百分比阈值因子。(默认85%)
}
// PeerAwareInstanceRegistryImpl
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// 禁用了自我保留模式,因此允许实例过期。
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
参考 芋道源码
网友评论