美文网首页
Eureka之Server端注册

Eureka之Server端注册

作者: 0爱上1 | 来源:发表于2019-04-30 17:07 被阅读0次

    上一篇文章Eureka源码之Client端注册分析了Client端注册的整体流程,本文将基于Server端源码,分析Server端是如何处理客户端的注册请求的

    注册时序图

    注册时序图

    源码

    ApplicationResource

    Jersey是一个轻量级的RESTful 框架,可以进一步地简化 RESTful service 和 client 开发,而ApplicationResource就是基于Jersey框架来处理请求的

    // Produces注解规定了请求响应体的MiME类型
    @Produces({"application/xml", "application/json"})
    public class ApplicationResource {
    
    private final String appName;
    
    // EurekaServer 配置信息
    private final EurekaServerConfig serverConfig;
    
    // 负责注册行为的"对等实例注册器"
    private final PeerAwareInstanceRegistry registry;
    private final ResponseCache responseCache;
    
    ApplicationResource(String appName,
                        EurekaServerConfig serverConfig,
                        PeerAwareInstanceRegistry registry) {
        this.appName = appName.toUpperCase();
        this.serverConfig = serverConfig;
        this.registry = registry;
        this.responseCache = registry.getResponseCache();
    }
    
    public String getAppName() {
        return appName;
    }
    
    /**
     * Registers information about a particular instance for an Application
     * POST请求:为应用程序注册关于特定实例的信息
     * 
     * @param info 注册实例的信息
     * @param isReplication 在header中的参数,表示是否是一个其他server节点复制的标识
     * 
     */
    @POST
    // Consumes 注解规定了请求方法接收体的MIME类型
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    
        // 打印入参日志                           
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        
        // 实例信息必要参数非空校验,返回400错误码
        ...
        
        // 获取注册实例信息的数据中心信息,处理客户端可能使用错误的DataCenterInfo注册丢失数据的情况
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
    
        // 注册实例,将注册行为委托给了PeerAwareInstanceRegistry 类去完成,也即PeerAwareInstanceRegistryImpl
        registry.register(info, "true".equals(isReplication));
        // 注册成功,返回204状态码
        return Response.status(204).build();
    }
    }
    

    PeerAwareInstanceRegistryImpl

    单例的对等实例注册处理器,负责将注册相关操作(主要有注册,心跳续约,取消,状态改变以及过期)复制到其他对等节点,以使节点之间保持同步

    一个Eureka Server只有一个PeerAwareInstanceRegistryImpl实例

    • UML类图
    PeerAwareInstanceRegistryImpl
    • 类源码
    @Singleton
    public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    
    // volatile 修饰的集群对等节点
    protected volatile PeerEurekaNodes peerEurekaNodes;
    
    // 内部定义的枚举类,用于描述eureka注册操作行为
    public enum Action {
        Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
    
        // 返回该枚举名字的监视类型,用于跟踪某项具体枚举类型操作花费的时间
        private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
        
        public com.netflix.servo.monitor.Timer getTimer() {
            return this.timer;
        }
    }
    
    // 内部属性以及其他方法省略
    ...
    
    /**
     * Registers the information about the {@link InstanceInfo} and replicates
     * this information to all peer eureka nodes. If this is replication event
     * from other replica nodes then it is not replicated.
     * 
     * 调用父类的注册方法完成此次注册操作,同时调用内部私有的replicateToPeers方法去完成server节点间的复制操作
     * 
     * @param info
     *            the {@link InstanceInfo} to be registered and replicated.
     * @param isReplication
     *            true if this is a replication event from other replica nodes,
     *            false otherwise.
     */
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 获取Client注册实例的租约持续时间,即server多久收不到该实例的心跳就会将该实例剔除掉,默认90秒
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    
        // 若Client注册实例设置了租约信息且租约持续时间大于0,则替换默认的90秒
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
    
        // 调用抽象父类的注册方法,完成注册行为
        super.register(info, leaseDuration, isReplication);
    
        // 将注册行为同步到集群内的其他节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     * 将eureka操作复制到集群其他节点,除了自己
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        // 启动监视该注册操作类型跑表
        Stopwatch tracer = action.getTimer().start();
        try {
            // 如果是其他节点的复制操作,则增加最近一分钟内节点复制操作的次数
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
        
            // If it is a replication already, do not replicate again as this will create a poison replication
            // 如果是一个来自其他server节点的复制操作,不作再次复制操作,因为方法调用之前会执行注册操作,直接返回
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
    
            // 循环所有对等节点,逐一复制
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                // 若节点是自身,不作复制操作
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 复制实例注册操作到节点
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            // 停止跑表
            tracer.stop();
        }
    }
    
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     * 根据不同的注册Action,由PeerEurekaNode调用不同的方法实现复制
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
    
            // 可以看到这里针对注册行为的不同,做了不同的前置处理,但最终都是将复制操作交由PeerEurekaNode类来完成的
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
    
    • PeerEurekaNode 对等Eureka节点
        /**
    * 
    * 代表了一个集群中节点,用于共享eureka的注册行为,包括
    * Register,Renew,Cancel,Expiration and Status Changes等
    *
    * @author Karthik Ranganathan, Greg Kim
    *
    */
    public class PeerEurekaNode {
    
    // 用于复制注册操作的HttpClient,发送请求时会自动将isReplication设为true,即表示这是一次节点间复制的请求
    private final HttpReplicationClient replicationClient;
    
    /**
     * 
     * 发送复制的注册信息到该类代表的对等节点
     *
     * @param info 实例信息
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        // 这里交给了一个批量任务执行器去执行
        batchingDispatcher.process(
                // 获取执行的任务id = "register" + "#" + appName + instanceId,用于唯一标识一个实例的复制注册行为
                // 该批量任务执行器默认会丢弃掉过期的任务,以及新任务来临会自动丢弃老任务,后台线程异步执行
                taskId("register", info),
    
                // 任务具体执行的逻辑,就是调用了replicationClient 的注册方法
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }
    

    Server端注册的重点来了,抽象类AbstractInstanceRegistry的register(...)注册方法

    • AbstractInstanceRegistry
    /**
    * Handles all registry requests from eureka clients.
    * 负责处理所有来自eureka 客户点的注册请求
    *
    * <p>
    * Primary operations that are performed are the
    * <em>Registers</em>, <em>Renewals</em>, <em>Cancels</em>, <em>Expirations</em>, and <em>Status Changes</em>. The
    * registry also stores only the delta operations
    * </p>
    * 
    * 执行的主要操作有,注册,续约,取消注册,过期和状态变化,注册表只会存储delta操作
    *
    * @author Karthik Ranganathan
    *
    */
    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    
    // 用于保存客户端注册信息的双层Map结构
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    
    // 定义读写锁,用在注册相关操作上
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
    // 读锁,为共享锁
    private final Lock read = readWriteLock.readLock();
    
    // 写锁,排他锁
    private final Lock write = readWriteLock.writeLock();
    protected final Object lock = new Object();
    
    // 持有eureka server配置信息以及eureka client配置信息
    protected final EurekaServerConfig serverConfig;
    protected final EurekaClientConfig clientConfig;
    
    // 负责缓存客户端注册信息的类,以供eureka client 查询
    protected volatile ResponseCache responseCache;
    
    /**
     * Registers a new instance with a given duration.
     * 根据Client指定的持续时间,注册一个实例
     * 
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 1. 获取读锁
            // TODO 这里留下疑问,为什么注册操作会使用读锁?
            read.lock();
    
            // 2. 根据此次注册client的appName,从注册表获取是否有已注册信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
    
            // 3. 如果没有已注册信息,表示首次注册
            if (gMap == null) {
                // 3.1 new一个该appName对应的注册信息Map
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                
                // 3.2 使用putIfAbsent向registry中新增元素
                // 只有当registry中不存在指定的key时,才会执行put元素操作,否则不执行put操作,另外一点是无论有没有执行put操作,都会返回preview 元素
                // 即当不存在指定key,就put元素,返回null,若存在key,则不执行put操作,返回已存在的value值
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
    
                // 3.3 如果返回null,则表示当前线程put成功,即registry中保存的value为当前线程new的值,即gMap = gNewMap
                if (gMap == null) {
                    gMap = gNewMap;
                }
                // 3.4 如果返回不是null,则表示可能有其他线程先put了,那3.2 中的gMap变量就是其他线程先put后的value
            }
    
            // 4. 根据注册客户端的InstanceId,获取gMap中该客户端的注册租约信息Lease
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
    
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            // 5. 已存在注册租约信息,保留最后dirty timestamp不去覆盖
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                // 注册租约信息不存在,表示这是一个新的注册,此处开始同步
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
    
            // 6. 不管之前的租约信息存在还是不存在,都会new一个新的租约注册信息,如果之前的租约信息存在,就更新新租约的服务启动时间为已存在租约的服务启动时间
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
    
            // 7. 将新租约的信息put进gMap中
            gMap.put(registrant.getId(), lease);
    
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            
            // 8. 新增最近改变的租约队列
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
    
            // 9. 失效缓存信息
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            // 10. 释放读锁
            read.unlock();
        }
    }
    

    总结一下register()做的事情就是

    1. 将client的注册信息封装成Lease<InstanceInfo> 租约,存入双层map结构中,不管存在不存在该Client的Lease信息,都会重新new一个新的Lease对象,put进内层Map中

    2. 更新recentRegisteredQueue,recentlyChangedQueue

    3. 失效responseCache缓存

    相关文章

      网友评论

          本文标题:Eureka之Server端注册

          本文链接:https://www.haomeiwen.com/subject/dvttnqtx.html