美文网首页
Eureka系列(八)服务剔除具体实现

Eureka系列(八)服务剔除具体实现

作者: 偷吃虾的猫 | 来源:发表于2020-04-06 19:33 被阅读0次

    服务下线的大致流程图

      下面这张图很简单地描述了服务剔除的大致流程: 服务剔除.jpg

    服务剔除实现源码分析

      首先我们得了解下服务剔除这个定时任务是什么被初始化启动的,在百度搜索中,在我们Eureka Server端启用的时执行的EurekaBootStrap类中initEurekaServerContext方法找到了服务剔除任务的初始化。接下来我们就看一看源码:

    protected void initEurekaServerContext() throws Exception {
            ...省略其他代码
            registry.openForTraffic(applicationInfoManager, registryCount);
            // Register all monitoring statistics.
            EurekaMonitors.registerAllStats();
        }
    

      在initEurekaServerContext()方法中, registry.openForTraffic(applicationInfoManager, registryCount)这个方法来初始化我们的服务剔除任务。我们看源码验证下:

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
       super.openForTraffic(applicationInfoManager,
             count == 0 ? this.defaultOpenForTrafficCount : count);
    }
    
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfRenewsPerMin = count * 2;
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }
    

      在openForTraffic方法中最后我们看到调用了父类postInit()方法,我们接着看postInit这个方法:

    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
    

      由上面可见,Eureka通过evictionTimer.schedule初始化了一个定时60s的定时任务。
      接下来我们来看看EvictionTask这个类的具体实现EvictionTask这个类实现了服务剔除的具体操作。

    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
    

      我们接着看evict()方法的实现:

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        // 先收集过期的实例信息,然后再剔除掉
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。没有它,我们就会把整个注册表都抹掉。
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
    
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
    

      由此可见,evict()方法最终实现了服务的剔除。

    \color{red}{注意:}
      \color{red}{Eureka的服务剔除会因为Eureka的自我保护机制而受到影响,导致不会剔除掉已经认为下线的服务},这一点,会在下一节中做下解Eureka自我保护机制的讲解。


       不知道有没有小伙伴对Eureka是如何判断这个实例是否不可用呢,有很大的疑惑呢?我们接下来去看一看lease.isExpired(additionalLeaseMs)这个方法,这个方法就是拿来判断实例是否可用。

    
        /**
         * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
         *
         * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
         * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
         * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
         * not be fixed.
         *
         * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
         */
        public boolean isExpired(long additionalLeaseMs) {
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
        }
    

      右上可见,我们可以发现Eureka是通过lastUpdateTimestamp这个上次更新时间来判断我们的服务是否可用,不知道小伙伴对服务续约哪里有影响,每当我们Client调用一次Server端服务续约接口时,Server端就会更新下服务的lastUpdateTimestamp。我们来回一下服务续约更新上次更新时间的方法,更新lastUpdateTimestamp代码如下:

       /**
        * Renew the lease, use renewal duration if it was specified by the
        * associated {@link T} during registration, otherwise default duration is
        * {@link #DEFAULT_DURATION_IN_SECS}.
        */
       public void renew() {
           lastUpdateTimestamp = System.currentTimeMillis() + duration;
    
       }
    

       不知道小伙伴有没有注意一个事情,在isExpired这个方法的注释里,好像有一个很大的“彩蛋”,注释如下:Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will not be fixed. 翻译过来就是:注意,由于renew()做了“错误”的事情,并将lastUpdateTimestamp设置为+duration,超过了它应该的值,因此到期实际上是2 * duration。这是一个小错误,应该只影响那些不正常关闭的实例。由于可能对现有的使用产生广泛的影响,这个问题将不会得到解决。
       简单来说,就是在服务续约执行renew()方法时,不应该加上duration这个值,但是呢,因为这个问题只会出现在检测不正常关闭的服务才会有影响,Eureka 官方怕其他正在运行的服务有影响,就没有修正这个小error。
      看到这儿,小伙伴是不是觉得,eureka的RD也是很神奇,明明知道这是一个bug,但是却不改(其实人家也想改,但是怕一改影响了其他的正常使用,然后考虑这个bug对Eureka正常使用没有太大影响,也就没有去修正了,但是人家RD还是很贴心的,在注释中还是说明这个问题,以及为什么不修正的原因)。


    题外

      可能有小伙伴会问,我们有服务下线接口,为什么还需要EurekaServer服务端自己启用一个服务剔除任务呢?
      其实很简单,因为如果我们是直接强制性停止任务,例如机器停电之类的,肯定Client就不会去调用服务下线接口,来通知Server端自己下线。其次如果我们Client正常停止,在调用服务下线接口中,发现网络出现问题,没法调用Server提供的接口,那样也没法让Server知道自己这个服务下线了。所以Server端需要自己启动一个服务剔除任务,来剔除掉哪些已经down掉的服务。(该观点为博主自己的主观观点,小伙伴也可以自行思考


      下面为自己总结的Eureka相关的知识点,有兴趣地小伙伴可以看一看,当然再点下赞就更棒了,创作不易!
      Eureka系列(一)Eureka功能介绍
      Eureka系列(二) 服务注册Server端具体实现
      Eureka系列(三)获取服务Client端具体实现
      Eureka系列(四) 获取服务Server端具体实现
      Eureka系列(五) 服务续约流程具体实现
      Eureka系列(六) TimedSupervisorTask类解析
      Eureka系列(七) 服务下线Server端具体实现
      Eureka系列(八)服务剔除具体实现
      Eureka系列(九)Eureka自我保护机制

    相关文章

      网友评论

          本文标题:Eureka系列(八)服务剔除具体实现

          本文链接:https://www.haomeiwen.com/subject/ozqmphtx.html