美文网首页
eureka-server服务注册的实现、集群同步及eureka

eureka-server服务注册的实现、集群同步及eureka

作者: 蕃茄那个西红柿 | 来源:发表于2020-12-31 22:29 被阅读0次

引用

注册debug追踪到InstanceRegistry

ApplicationResource #addInstance
 registry.register(info, "true".equals(isReplication));------------------>
 
InstanceRegistry # 
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //springcloud发布一个事件EurekaInstanceRegisteredEvent
        handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
 
        //调用父类的register
        super.register(info, isReplication);
    }
 
---------------------------->>

PeerAwareInstanceRegistryImpl #

/**                                                                                                           
 * Registers the information about the {@link InstanceInfo} and replicates                                    
 * this information to all peer eureka nodes. If this is replication event                                    
 * from other replica nodes then it is not replicated.                                                        
 *                                                                                                            
 * @param info                                                                                                
 *            the {@link InstanceInfo} to be registered and replicated.                                       
 * @param isReplication                                                                                       
 *            true if this is a replication event from other replica nodes,                                   
 *            false otherwise.                                                                                
 */                                                                                                           
@Override                                                                                                     
public void register(final InstanceInfo info, final boolean isReplication) {  
    //服务过期时间默认90秒                                
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;                                                       
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {                         
        leaseDuration = info.getLeaseInfo().getDurationInSecs();                                              
    }         
    //服务注册                                                                                                
    super.register(info, leaseDuration, isReplication);
    //集群同步                                                       
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);            

AbstractInstanceRegistry#register 服务注册

我们知道spring容器底层就是一个ConcurrentHashMap,那么eureka的底层注册是什么样的数据结构呢?没错一定也是一个map.

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

registry的数据结构如上等价于<applicationName, Map<instanceId, Lease>>,为了进行相同服务的集群话,为上一层模块进行调用时方便负载均衡.

/**
 * Registers a new instance with a given duration.
 *
 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
 */
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        //根据应用名称获取对应的服务,因为微服务的application name可以相同,
        //服务实例instance id是不同的(方便集群,为负载均衡作准备),
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
         //如果上面存在相同的服务的application name的微服务,那么就根据对应的服务的实例instance id来区分
         //尝试通过id拿到一个微服务实例,
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            //已经存在的微服务实例最后修改时间
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            //要注册的微服务实例最后修改时间
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            //如果已存的微服务时间>要注册的(时间越大说明操作越新),用已存的覆盖要注册的
            //即如果出现冲突的话拿最新的微服务实例
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                //期待发送心跳的客户端数量
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    //要注册进来了,默认心跳30秒一次,每次心跳在原基础上加一,一分钟2次所以加2
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //如果if(gMap == null)都没有进,说明微服务组内已经有微服务了,直接put(id,instance)即可
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            //最近注册队列添加此微服务
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }
        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        //标记微服务实例ADDED
        registrant.setActionType(ActionType.ADDED);
        //最近改变队列添加此微服务,此队列会保存近三分钟有改动的微服务,用于增量更新
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //设置最后更新的时间戳
        registrant.setLastUpdatedTimestamp();
        // 放入缓存中
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

updateRenewsPerMinThreshold()

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

expectedNumberOfClientsSendingRenews:期待发送心跳的客户端数量
ExpectedClientRenewalIntervalSeconds:期待客户端发送心跳的间隔秒数
RenewalPercentThreshold:续期的百分比阈值85%
numberOfRenewsPerMinThreshold:客户端每分钟发送心跳数的阈值,如果server在一分钟内没有收到这么多的心跳数就会触发自我保护机制

举个例子就明白了:
假设有100个客户端,发送心跳间隔为30s,那么一分钟如果全部正常的话server收到的心跳应该是200次,
如果server一分钟收到的心跳<200*85%,即170个触发自我保护机制

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 循环相邻的Eureka Server Node, 分别发起请求同步
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 发起同步请求
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

集群同步步骤

1.判断集群节点是否为空,为空则返回
2.isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求
这个时候是不需要继续往下同步的。否则会陷入同步死循环
3.循环集群节点,过滤掉自身的节点
4.发起同步请求 ,调用replicateInstanceActionsToPeers

PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每10分钟一次

总结

1:@EnableEurekaServer注入一个Marker类,说明是一个注册中心
2:EurekaServerAutoConfiguration注入一个filter,来拦截jersey请求转发给resource
3:服务注册,就是把信息存到一个ConcurrentHashMap<name, Map<id,Lease>>
4:对于注册冲突拿最新的微服务实例
5:server每分钟内收到的心跳数低于理应收到的85%就会触发自我保护机制
6:Lease的renew bug, duration多加了一次,理应加一个expireTime表示过期时间
7:集群同步:先注册到一台server,然后遍历其他的集群的其他server节点调用register注册到其他server,
isReplication=true代表此次注册来源于集群同步的注册,代表此次注册不要再进行集群同步,避免无限注册

相关文章

网友评论

      本文标题:eureka-server服务注册的实现、集群同步及eureka

      本文链接:https://www.haomeiwen.com/subject/twfdoktx.html