美文网首页
eureka-server服务注册的实现、集群同步及eureka

eureka-server服务注册的实现、集群同步及eureka

作者: 蕃茄那个西红柿 | 来源:发表于2020-12-31 22:29 被阅读0次

    引用

    注册debug追踪到InstanceRegistry

    ApplicationResource #addInstance
     registry.register(info, "true".equals(isReplication));------------------>
     
    InstanceRegistry # 
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            //springcloud发布一个事件EurekaInstanceRegisteredEvent
            handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
     
            //调用父类的register
            super.register(info, isReplication);
        }
     
    ---------------------------->>
    

    PeerAwareInstanceRegistryImpl #

    /**                                                                                                           
     * Registers the information about the {@link InstanceInfo} and replicates                                    
     * this information to all peer eureka nodes. If this is replication event                                    
     * from other replica nodes then it is not replicated.                                                        
     *                                                                                                            
     * @param info                                                                                                
     *            the {@link InstanceInfo} to be registered and replicated.                                       
     * @param isReplication                                                                                       
     *            true if this is a replication event from other replica nodes,                                   
     *            false otherwise.                                                                                
     */                                                                                                           
    @Override                                                                                                     
    public void register(final InstanceInfo info, final boolean isReplication) {  
        //服务过期时间默认90秒                                
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;                                                       
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {                         
            leaseDuration = info.getLeaseInfo().getDurationInSecs();                                              
        }         
        //服务注册                                                                                                
        super.register(info, leaseDuration, isReplication);
        //集群同步                                                       
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);            
    
    

    AbstractInstanceRegistry#register 服务注册

    我们知道spring容器底层就是一个ConcurrentHashMap,那么eureka的底层注册是什么样的数据结构呢?没错一定也是一个map.

    ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
    

    registry的数据结构如上等价于<applicationName, Map<instanceId, Lease>>,为了进行相同服务的集群话,为上一层模块进行调用时方便负载均衡.

    /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            //根据应用名称获取对应的服务,因为微服务的application name可以相同,
            //服务实例instance id是不同的(方便集群,为负载均衡作准备),
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
             //如果上面存在相同的服务的application name的微服务,那么就根据对应的服务的实例instance id来区分
             //尝试通过id拿到一个微服务实例,
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                //已经存在的微服务实例最后修改时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                //要注册的微服务实例最后修改时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                //如果已存的微服务时间>要注册的(时间越大说明操作越新),用已存的覆盖要注册的
                //即如果出现冲突的话拿最新的微服务实例
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    //期待发送心跳的客户端数量
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        //要注册进来了,默认心跳30秒一次,每次心跳在原基础上加一,一分钟2次所以加2
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            //如果if(gMap == null)都没有进,说明微服务组内已经有微服务了,直接put(id,instance)即可
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                //最近注册队列添加此微服务
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            //标记微服务实例ADDED
            registrant.setActionType(ActionType.ADDED);
            //最近改变队列添加此微服务,此队列会保存近三分钟有改动的微服务,用于增量更新
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            //设置最后更新的时间戳
            registrant.setLastUpdatedTimestamp();
            // 放入缓存中
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
    

    updateRenewsPerMinThreshold()

    protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }
    

    expectedNumberOfClientsSendingRenews:期待发送心跳的客户端数量
    ExpectedClientRenewalIntervalSeconds:期待客户端发送心跳的间隔秒数
    RenewalPercentThreshold:续期的百分比阈值85%
    numberOfRenewsPerMinThreshold:客户端每分钟发送心跳数的阈值,如果server在一分钟内没有收到这么多的心跳数就会触发自我保护机制

    举个例子就明白了:
    假设有100个客户端,发送心跳间隔为30s,那么一分钟如果全部正常的话server收到的心跳应该是200次,
    如果server一分钟收到的心跳<200*85%,即170个触发自我保护机制

    /**
         * Replicates all eureka actions to peer eureka nodes except for replication
         * traffic to this node.
         *
         */
        private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
                // If it is a replication already, do not replicate again as this will create a poison replication
                // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
                // 循环相邻的Eureka Server Node, 分别发起请求同步
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    // 发起同步请求
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    

    集群同步步骤

    1.判断集群节点是否为空,为空则返回
    2.isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求
    这个时候是不需要继续往下同步的。否则会陷入同步死循环
    3.循环集群节点,过滤掉自身的节点
    4.发起同步请求 ,调用replicateInstanceActionsToPeers

    PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每10分钟一次

    总结

    1:@EnableEurekaServer注入一个Marker类,说明是一个注册中心
    2:EurekaServerAutoConfiguration注入一个filter,来拦截jersey请求转发给resource
    3:服务注册,就是把信息存到一个ConcurrentHashMap<name, Map<id,Lease>>
    4:对于注册冲突拿最新的微服务实例
    5:server每分钟内收到的心跳数低于理应收到的85%就会触发自我保护机制
    6:Lease的renew bug, duration多加了一次,理应加一个expireTime表示过期时间
    7:集群同步:先注册到一台server,然后遍历其他的集群的其他server节点调用register注册到其他server,
    isReplication=true代表此次注册来源于集群同步的注册,代表此次注册不要再进行集群同步,避免无限注册

    相关文章

      网友评论

          本文标题:eureka-server服务注册的实现、集群同步及eureka

          本文链接:https://www.haomeiwen.com/subject/twfdoktx.html