美文网首页分布式系统
Eureka源码解析(二) renew源码分析

Eureka源码解析(二) renew源码分析

作者: nerowu | 来源:发表于2020-10-24 10:29 被阅读0次

    阅读前的思考

    使用netflix eureka做服务管理时,若你只停留在对eureka的概念理解和使用层面,那么你面试时会得到面试官的灵魂拷问,例如:
    1)eureka将服务注册信息存放在哪里?服务注册信息都有哪些内容?
    2)eureka如何做到高可用?底层的通信机制是什么?
    3)心跳机制到底发送些什么内容,有了解吗?
    4)服务注册列表是存在客户端还是服务端?如果多复本数据不一致怎么处理?
    5)若网络故障服务注册失败了,eureka是如何保证注册成功的?
    6)注册,同步,下线,剔除分别是怎么实现的?
    7)为什么刚启动的服务没有即时被eureka发现?对此你还遇到过哪些坑?

    带着这些问题或疑惑,作者决定推出eureka源码解读系列,从众所周知的Eureka功能着手,对register,renew,heartbeat,fetch,剔除/关闭,数据复制等进行源码解读,意在深入理解eureka功能。

    Tip:建议开篇从 Eureka源码解析(一) 开始,之后的文章是基于开篇的分析成果之上进行撰写的。

    Renew client端处理流程

    DiscoveryClient.renew()方法定义的服务续约的具体流程。

    boolean  renew(){
        EurekaHttpResponse<InstanceInfo>httpResponse;
        try{
            //通过sendHeartBeat的方式完成
            httpResponse=eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null);
            
            if(httpResponse.getStatusCode()==Status.NOT_FOUND.getStatusCode()){
                REREGISTER_COUNTER.increment();
                long timestamp=instanceInfo.setIsDirtyWithTime();
                boolean success=register();
                if(success){
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode()==Status.OK.getStatusCode();
        }catch(Throwablee){
        }
    }
    

    sendHeartBeat一共有两个类实现了方法,AbstractJerseyEurekaHttpClient,JerseyReplicationClient无论是哪个发送内容者是一样的。

    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus){
        String urlPath="apps/"+appName+'/'+id;
        ……
        WebResourcewebResource=jerseyClient.resource(serviceUrl)
            .path(urlPath)
            .queryParam("status",info.getStatus().toString())
            .queryParam("lastDirtyTimestamp",info.getLastDirtyTimestamp().toString());
        if(overriddenStatus!=null){
            webResource=webResource.queryParam("overriddenstatus",overriddenStatus.name());
        }
        ……
        //http put请求
        response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
        ……
    }
    
    public enum InstanceStatus:
    UP,//Ready to receive traffic
    DOWN,//Do not send traffic-health check callback failed
    STARTING,//Just about starting-initializations to be done-do not
    //sendtraffic
    OUT_OF_SERVICE,//Intentionally shut down for traffic
    UNKNOWN;
    

    心跳发送内容:appName+id(instanceId)作为url,参数:status即服务状态,lastDirtyTimestamp 即instance在client端最后被修改的时间戳,overriddenStatus 更新过的服务状态。

    服务续约的调用

    DiscoveryClient 构造方法中调用了initScheduledTasks方法初始化了heartbeatTask 用于执行心跳发送任务,具体任务实现在HeartbeatThread(Runnable)中

    /**
    *Theheartbeattaskthatrenewstheleaseinthegivenintervals.
    */
    private class HeartbeatThread implements Runnable{
    
        public void run(){
            //调用renew
            if(renew()){
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    

    Eureka 默认每隔30秒一次心跳检测具体现实如下:

    //Heartbeat timer
    heartbeatTask = new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,//默认30秒
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
    );
    scheduler.schedule(
        heartbeatTask,
            renewalIntervalInSecs,TimeUnit.SECONDS);
    

    总结:renew功能与心跳是绑定在一起发送的。定时心跳实现采用JDK的ScheduledExecutorService,执行任务是调用renew函数,发送的内容为appName+instanceId组成的url,参数是服务器状态status 和 最近更新状态的时间戳。以Http put 请求发送

    Server 端流程处理

    跟register服务处理流程一样,服务器启动后扫描并创建ApplicationResource,根据@Path("{id}")创建InstanceResource并调用@Put修饰的方法。

    @PUT
    public Response renewLease(
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
        @QueryParam("overriddenstatus") StringoverriddenStatus,
        @QueryParam("status") Stringstatus,
        @QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp){
        
        booleanisFromReplicaNode="true".equals(isReplication);
        booleanisSuccess=registry.renew(app.getName(),id,isFromReplicaNode);
        ……
    

    register.renew指向AbstractInstanceRegistry的renew方法,代码如下:

    public boolean renew(String appName,String id,boolean isReplication){
        RENEW.increment(isReplication);
        Map<String,Lease<InstanceInfo>>gMap=registry.get(appName);
        Lease<InstanceInfo>leaseToRenew=null;
        if(gMap!=null){
            leaseToRenew=gMap.get(id);
        }
        //如果gMap中找不到服务实例返回false,则续租失败。
        if(leaseToRenew==null){
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS:Registry:leasedoesn'texist,registeringresource:{}-{}",appName,id);
            return false;
        }else{
            InstanceInfoinstanceInfo=leaseToRenew.getHolder();
            if(instanceInfo!=null){
                InstanceStatus overriddenInstanceStatus=this.getOverriddenInstanceStatus(instanceInfo,leaseToRenew,isReplication);
                if(overriddenInstanceStatus==InstanceStatus.UNKNOWN){
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            //更新时间戳 lastUpdateTimestamp=System.currentTimeMillis()+duration;
            leaseToRenew.renew();
            return true;
        }
    
    }
    

    此处renew方法在concurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry中查找(appName,(instanceId,Lease<InstanceInfo>)).如果判断并更新时间Instance的时间戳。如果查询不到,则返回false.意味着被剔除的服务是无法得到续租的。

    eureka没有服务超时一说,每隔30秒一次心跳,服务在map中存在则更新时间戳,90秒没有续租上则执行服务剔除。

    相关文章

      网友评论

        本文标题:Eureka源码解析(二) renew源码分析

        本文链接:https://www.haomeiwen.com/subject/resimktx.html