上一篇文章Eureka源码之Client端注册分析了Client端注册的整体流程,本文将基于Server端源码,分析Server端是如何处理客户端的注册请求的
注册时序图
注册时序图源码
ApplicationResource
Jersey是一个轻量级的RESTful 框架,可以进一步地简化 RESTful service 和 client 开发,而ApplicationResource就是基于Jersey框架来处理请求的
// Produces注解规定了请求响应体的MiME类型
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
private final String appName;
// EurekaServer 配置信息
private final EurekaServerConfig serverConfig;
// 负责注册行为的"对等实例注册器"
private final PeerAwareInstanceRegistry registry;
private final ResponseCache responseCache;
ApplicationResource(String appName,
EurekaServerConfig serverConfig,
PeerAwareInstanceRegistry registry) {
this.appName = appName.toUpperCase();
this.serverConfig = serverConfig;
this.registry = registry;
this.responseCache = registry.getResponseCache();
}
public String getAppName() {
return appName;
}
/**
* Registers information about a particular instance for an Application
* POST请求:为应用程序注册关于特定实例的信息
*
* @param info 注册实例的信息
* @param isReplication 在header中的参数,表示是否是一个其他server节点复制的标识
*
*/
@POST
// Consumes 注解规定了请求方法接收体的MIME类型
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 打印入参日志
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// 实例信息必要参数非空校验,返回400错误码
...
// 获取注册实例信息的数据中心信息,处理客户端可能使用错误的DataCenterInfo注册丢失数据的情况
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 注册实例,将注册行为委托给了PeerAwareInstanceRegistry 类去完成,也即PeerAwareInstanceRegistryImpl
registry.register(info, "true".equals(isReplication));
// 注册成功,返回204状态码
return Response.status(204).build();
}
}
PeerAwareInstanceRegistryImpl
单例的对等实例注册处理器,负责将注册相关操作(主要有注册,心跳续约,取消,状态改变以及过期)复制到其他对等节点,以使节点之间保持同步
一个Eureka Server只有一个PeerAwareInstanceRegistryImpl实例
- UML类图
- 类源码
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
// volatile 修饰的集群对等节点
protected volatile PeerEurekaNodes peerEurekaNodes;
// 内部定义的枚举类,用于描述eureka注册操作行为
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
// 返回该枚举名字的监视类型,用于跟踪某项具体枚举类型操作花费的时间
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
// 内部属性以及其他方法省略
...
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* 调用父类的注册方法完成此次注册操作,同时调用内部私有的replicateToPeers方法去完成server节点间的复制操作
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 获取Client注册实例的租约持续时间,即server多久收不到该实例的心跳就会将该实例剔除掉,默认90秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 若Client注册实例设置了租约信息且租约持续时间大于0,则替换默认的90秒
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 调用抽象父类的注册方法,完成注册行为
super.register(info, leaseDuration, isReplication);
// 将注册行为同步到集群内的其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
* 将eureka操作复制到集群其他节点,除了自己
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
// 启动监视该注册操作类型跑表
Stopwatch tracer = action.getTimer().start();
try {
// 如果是其他节点的复制操作,则增加最近一分钟内节点复制操作的次数
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 如果是一个来自其他server节点的复制操作,不作再次复制操作,因为方法调用之前会执行注册操作,直接返回
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 循环所有对等节点,逐一复制
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 若节点是自身,不作复制操作
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 复制实例注册操作到节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
// 停止跑表
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
* 根据不同的注册Action,由PeerEurekaNode调用不同的方法实现复制
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
// 可以看到这里针对注册行为的不同,做了不同的前置处理,但最终都是将复制操作交由PeerEurekaNode类来完成的
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
- PeerEurekaNode 对等Eureka节点
/**
*
* 代表了一个集群中节点,用于共享eureka的注册行为,包括
* Register,Renew,Cancel,Expiration and Status Changes等
*
* @author Karthik Ranganathan, Greg Kim
*
*/
public class PeerEurekaNode {
// 用于复制注册操作的HttpClient,发送请求时会自动将isReplication设为true,即表示这是一次节点间复制的请求
private final HttpReplicationClient replicationClient;
/**
*
* 发送复制的注册信息到该类代表的对等节点
*
* @param info 实例信息
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 这里交给了一个批量任务执行器去执行
batchingDispatcher.process(
// 获取执行的任务id = "register" + "#" + appName + instanceId,用于唯一标识一个实例的复制注册行为
// 该批量任务执行器默认会丢弃掉过期的任务,以及新任务来临会自动丢弃老任务,后台线程异步执行
taskId("register", info),
// 任务具体执行的逻辑,就是调用了replicationClient 的注册方法
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
Server端注册的重点来了,抽象类AbstractInstanceRegistry的register(...)注册方法
- AbstractInstanceRegistry
/**
* Handles all registry requests from eureka clients.
* 负责处理所有来自eureka 客户点的注册请求
*
* <p>
* Primary operations that are performed are the
* <em>Registers</em>, <em>Renewals</em>, <em>Cancels</em>, <em>Expirations</em>, and <em>Status Changes</em>. The
* registry also stores only the delta operations
* </p>
*
* 执行的主要操作有,注册,续约,取消注册,过期和状态变化,注册表只会存储delta操作
*
* @author Karthik Ranganathan
*
*/
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
// 用于保存客户端注册信息的双层Map结构
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
// 定义读写锁,用在注册相关操作上
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 读锁,为共享锁
private final Lock read = readWriteLock.readLock();
// 写锁,排他锁
private final Lock write = readWriteLock.writeLock();
protected final Object lock = new Object();
// 持有eureka server配置信息以及eureka client配置信息
protected final EurekaServerConfig serverConfig;
protected final EurekaClientConfig clientConfig;
// 负责缓存客户端注册信息的类,以供eureka client 查询
protected volatile ResponseCache responseCache;
/**
* Registers a new instance with a given duration.
* 根据Client指定的持续时间,注册一个实例
*
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 1. 获取读锁
// TODO 这里留下疑问,为什么注册操作会使用读锁?
read.lock();
// 2. 根据此次注册client的appName,从注册表获取是否有已注册信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 3. 如果没有已注册信息,表示首次注册
if (gMap == null) {
// 3.1 new一个该appName对应的注册信息Map
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// 3.2 使用putIfAbsent向registry中新增元素
// 只有当registry中不存在指定的key时,才会执行put元素操作,否则不执行put操作,另外一点是无论有没有执行put操作,都会返回preview 元素
// 即当不存在指定key,就put元素,返回null,若存在key,则不执行put操作,返回已存在的value值
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
// 3.3 如果返回null,则表示当前线程put成功,即registry中保存的value为当前线程new的值,即gMap = gNewMap
if (gMap == null) {
gMap = gNewMap;
}
// 3.4 如果返回不是null,则表示可能有其他线程先put了,那3.2 中的gMap变量就是其他线程先put后的value
}
// 4. 根据注册客户端的InstanceId,获取gMap中该客户端的注册租约信息Lease
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 5. 已存在注册租约信息,保留最后dirty timestamp不去覆盖
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 注册租约信息不存在,表示这是一个新的注册,此处开始同步
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 6. 不管之前的租约信息存在还是不存在,都会new一个新的租约注册信息,如果之前的租约信息存在,就更新新租约的服务启动时间为已存在租约的服务启动时间
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 7. 将新租约的信息put进gMap中
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
// 8. 新增最近改变的租约队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 9. 失效缓存信息
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 10. 释放读锁
read.unlock();
}
}
总结一下register()做的事情就是
-
将client的注册信息封装成Lease<InstanceInfo> 租约,存入双层map结构中,不管存在不存在该Client的Lease信息,都会重新new一个新的Lease对象,put进内层Map中
-
更新recentRegisteredQueue,recentlyChangedQueue
-
失效responseCache缓存
网友评论