美文网首页
Eureka源码分析(六) TimedSupervisorTas

Eureka源码分析(六) TimedSupervisorTas

作者: skyguard | 来源:发表于2018-11-10 10:45 被阅读0次

    之前我们分析了eureka的注册服务实例信息,下面我们来分析下eureka的续租。当一个租约到期后,就有两种情况,一种是过期,EurekaServer将下线过期的节点,一种是续租,当EurekaServer检测到节点还能正常通信时,将执行续租的操作。我们知道,检测节点状态是ScheduledExecutorService的schedule方法,那么定时检测节点状态的任务是怎么执行的呢,答案就是TimedSupervisorTask。我们先来看下TimedSupervisorTask都有哪些属性

    /**
     * 定时任务服务
     */
    private final ScheduledExecutorService scheduler;
    /**
     * 执行子任务线程池
     */
    private final ThreadPoolExecutor executor;
    /**
     *子 任务执行超时时间
     */
    private final long timeoutMillis;
    /**
     * 子任务
     */
    private final Runnable task;
    /**
     * 当前子任务执行频率
     */
    private final AtomicLong delay;
    /**
     * 最大子任务执行频率
     *
     * 子任务执行超时情况下使用
     */
    private final long maxDelay;
    

    TimedSupervisorTask 执行时,提交 task 到 executor 执行任务。
    当 task 执行正常,TimedSupervisorTask 再次提交自己到scheduler 延迟 timeoutMillis 执行。
    当 task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己到scheduler 延迟执行。
    再来看下run方法的具体实现

    public void run() {
        Future<?> future = null;
        try {
            // 提交 任务
            future = executor.submit(task);
            //
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 等待任务 执行完成 或 超时
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 设置 下一次任务执行频率
            delay.set(timeoutMillis);
            //
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.error("task supervisor timed out", e);
            timeoutCounter.increment(); //
    
            // 设置 下一次任务执行频率
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
    
        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.error("task supervisor rejected the task", e);
            }
    
            rejectedCounter.increment(); //
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.error("task supervisor threw an exception", e);
            }
    
            throwableCounter.increment(); //
        } finally {
            // 取消 未完成的任务
            if (future != null) {
                future.cancel(true);
            }
    
            // 调度 下次任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
    

    续租应用实例信息的请求,映射 InstanceResource的renewLease方法,看下具体的实现

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 续租
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // 续租失败
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
    
        // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else { // 成功
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }
    

    调用 AbstractInstanceRegistry的renew方法,续租应用实例信息,看下具体的实现

    public boolean renew(String appName, String id, boolean isReplication) {
        // 增加 续租次数 到 监控
        RENEW.increment(isReplication);
        // 获得 租约
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        // 租约不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                // 获得 应用实例状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                // 应用实例状态未知,无法续约
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                // 设置 应用实例状态
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            // 新增 续租每分钟次数
            renewsLastMin.increment();
            // 设置 租约最后更新时间(续租)
            leaseToRenew.renew();
            return true;
        }
    }
    

    调用 Lease的renew方法,设置租约最后更新时间( 续租 ),看下具体的实现

    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
    

    整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此不需要加锁。
    Eureka续租的操作就完成了。
    TimedSupervisorTask的分析就到这里了。

    相关文章

      网友评论

          本文标题:Eureka源码分析(六) TimedSupervisorTas

          本文链接:https://www.haomeiwen.com/subject/ltnwxqtx.html