美文网首页
深入 Eureka 服务注册 源码分析(二)

深入 Eureka 服务注册 源码分析(二)

作者: sharedCode | 来源:发表于2018-08-08 10:21 被阅读0次

    Eureka-Client注册服务

    啥时候会注册

    在两种情况下客户端会主动向服务端发送自己的注册信息

    1.当客户端的instance信息发生改变时,Eureka-Client和Server端信息不一致时

    2.当客户端刚刚启动的时候。

    定时器注册

    com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330规范

    private void initScheduledTasks() {
        //省略, 刷新缓存的定时器
        if (clientConfig.shouldRegisterWithEureka()) {
           //省略, 发送心跳的定时器
            // 监听instance的状态变更
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
        
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            // 注册监听
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 开启实例信息复制器
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    initScheduledTasks() 方法是在DiscoverClient的构造函数初始化的时候被调用。

    主要作用就是:

    1.开启缓刷新定时器

    2.开启发送心跳的定时器

    3.开启实例instance状态变更监听

    4.开启应用状态复制器(主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册)

    //InstanceInfoReplicator.java
    class InstanceInfoReplicator implements Runnable {
    
        public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                // 首次进来设置一下。
                instanceInfo.setIsDirty();  // for initial register
                // 开启定时线程 , 每停顿initialDelayMs秒执行一次该任务。 服务注册也是由这个任务完成
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
        // show down
        public void stop() {
            scheduler.shutdownNow();
            started.set(false);
        }
        // 这个方法主要是在上面提到的监听器里面被调用。
        public boolean onDemandUpdate() {
            if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
                        // 这个地方用来获取定时线程的执行Future,如果该线程还没有执行完毕,则取消掉,释放资源,因为下面也会执行run方法
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
        
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to rate limiter");
                return false;
            }
        }
        
        public void run() {
            try {
                // 刷新实例信息。
                discoveryClient.refreshInstanceInfo();
                // 判断实例信息是否不一致
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    // 注册自己的服务
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    从上面可以看到InstanceInfoReplicator是一个负责服务注册的线程任务, 有两个地方可以执行这个任务

    1.定时线程,每40秒执行一次。

    2.当instance的状态发生变更(除去DOWN这个状态)的时候,会有statusChangeListener 这个监听器监听到

    去执行服务注册。

    PS: 现在网上有些文章,说服务启动后,要隔40秒才会去注册自己的服务,这个说法是错误的。 当应用刚刚启动的时候,注册服务不是依赖那个定时线程去跑的,而是在EurekaAutoServiceRegistration这个类里面

    自动注册

    在EurekaClientAutoConfiguration这个自动配置类里面,有下面这一段代码

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
       return new EurekaAutoServiceRegistration(context, registry, registration);
    }
    

    上面这段代码,很简单,就是实例化了一个Bean,主要是这个Bean实现了SmartLifecycle, 有这个接口标识的在spring 容器加载完所有的Bean之后会执行该类的start方法, 下面可以详细看看这个代码。

    //EurekaAutoServiceRegistration.java
    public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
    
       private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);
    
       private AtomicBoolean running = new AtomicBoolean(false);
    
       private int order = 0;
    
       private AtomicInteger port = new AtomicInteger(0);
    
       private ApplicationContext context;
    
       private EurekaServiceRegistry serviceRegistry;
    
       private EurekaRegistration registration;
    
       public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry, EurekaRegistration registration) {
          this.context = context;
          this.serviceRegistry = serviceRegistry;
          this.registration = registration;
       }
    
       @Override
       public void start() {
          // 端口配置
          if (this.port.get() != 0 && this.registration.getNonSecurePort() == 0) {
             this.registration.setNonSecurePort(this.port.get());
          }
        
          // 没有在运行
          if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
             // 重点就这这里,这里是主动去注册。
             this.serviceRegistry.register(this.registration);
            // 发布 节点注册事件
             this.context.publishEvent(
                   new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
             this.running.set(true);
          }
       }
    // EurekaServiceRegistry.java
    @Override
    public void register(EurekaRegistration reg) {
       maybeInitializeClient(reg);
    
       if (log.isInfoEnabled()) {
          log.info("Registering application " + reg.getInstanceConfig().getAppname()
                + " with eureka with status "
                + reg.getInstanceConfig().getInitialStatus());
       }
       // 设置当前实例的instanceStatus, 一旦这个实例的状态发生改变,
       // 只要状态不是DOWN,那么就会被监听器监听到,最终执行服务注册
       reg.getApplicationInfoManager()
             .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
        // 健康检查器不为空
       if (reg.getHealthCheckHandler() != null) {
          // 设置进去
          reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
       }
    }
    

    总结: 服务注册分为两种,

    第一种: 当应用启动的时候,如果应用开启了自动注册(默认开启), 那么在自动配置类加载的时候,会通过EurekaAutoServiceRegistration实例化的时候,去改变instance的status, 最终被监听器监听到,执行服务注册的代码

    第二种: 主要应用于启动之后,当应用的信息发生改变之后,每40每秒执行一次的线程,检测到了,也会自动去注册一次。

    DiscoveryClient.register()

    //DiscoveryClient.java
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            // 发起HTTP请求
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    

    使用的Jersey框架来完成http的请求调用

    //AbstractJerseyEurekaHttpClient.java
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

    Eureka-Server接收注册

    ApplicationResource

    接下来可以看一下,服务端是接收到请求之后是如何处理的。

    程序入口: com.netflix.eureka.resources.ApplicationResource.addInstance()

    //ApplicationResource.java
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        // 参数校验,不符合验证规则的,返回400状态码,此处不做详解
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }
        
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
        // 重点在这里
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
    

    PeerAwareInstanceRegistryImpl

    上面的register方法,最终调用的是PeerAwareInstanceRegistryImpl的方法

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 租约的过期时间,默认90秒,也就是说当服务端超过90秒没有收到客户端的心跳,则主动剔除该节点。
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            // 如果客户端自定义了,那么以客户端为准
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 节点注册
        super.register(info, leaseDuration, isReplication);
        // 复制到同等服务节点上去
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    

    由上面可以知道,调用的是父类的register方法, 其父类是AbstractInstanceRegistry ,在了解具体的注册方法之前,

    需要先了解一下Lease这个对象,因为Eureka-Server最终处理注册信息的时候,都会转化为这个对象来处理。

    Lease

    public class Lease<T> {
    
        enum Action {
            Register, Cancel, Renew
        };
        
        public static final int DEFAULT_DURATION_IN_SECS = 90;
        
        private T holder;
        private long evictionTimestamp;
        private long registrationTimestamp;
        private long serviceUpTimestamp;
        
        private volatile long lastUpdateTimestamp;
        private long duration;
        
        public Lease(T r, int durationInSecs) {
            holder = r;
            registrationTimestamp = System.currentTimeMillis();
            lastUpdateTimestamp = registrationTimestamp;
            //durationInSecs为秒单位, 换算成毫秒
            duration = (durationInSecs * 1000);
        
        }
        
        // 客户端续约时,更新最后的更新时间 , 用当前系统加上过期的时间
        public void renew() {
            lastUpdateTimestamp = System.currentTimeMillis() + duration;
        
        }
    
       // 服务下线时,更新服务下线时间
        public void cancel() {
            if (evictionTimestamp <= 0) {
                evictionTimestamp = System.currentTimeMillis();
            }
        }
    
    
        public void serviceUp() {
            if (serviceUpTimestamp == 0) {
                serviceUpTimestamp = System.currentTimeMillis();
            }
        }
    
    
        public void setServiceUpTimestamp(long serviceUpTimestamp) {
            this.serviceUpTimestamp = serviceUpTimestamp;
        }
    
    
        public boolean isExpired() {
            return isExpired(0l);
        }
    
    
        public boolean isExpired(long additionalLeaseMs) {
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
        }
    }
    

    DEFAULT_DURATION_IN_SECS : 租约过期的时间常量,默认未90秒,也就说90秒没有心跳过来,那么这边将会自动剔除该节点holder :这个租约是属于谁的, 目前占用这个属性的是 instanceInfo,也就是客户端实例信息。evictionTimestamp : 租约是啥时候过期的,当服务下线的时候,会过来更新这个时间戳registrationTimestamp : 租约的注册时间
    serviceUpTimestamp :服务启动时间 ,当客户端在注册的时候,instanceInfo的status 为UP的时候,则更新这个时间戳

    public void serviceUp() {
            if (serviceUpTimestamp == 0) {
                serviceUpTimestamp = System.currentTimeMillis();
            }
    }
    

    lastUpdateTimestamp :最后更新时间,每次续约的时候,都会更新这个时间戳,在判断实例

    是否过期时,需要用到这个属性。

    duration:过期时间,毫秒单位

    AbstractInstanceRegistry

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 上只读锁
            read.lock();
            // 从本地MAP里面获取当前实例的信息。
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 增加注册次数到监控信息里面去。
            REGISTER.increment(isReplication);
            if (gMap == null) {
                // 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                // putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。
                // 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
                // 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    // 表明map中确实不存在,则设置gMap为最新创建的那个
                    gMap = gNewMap;
                }
            }
            // 从MAP中查询已经存在的Lease信息 (比如第二次来)
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 当Lease的对象不为空时。
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();   // client
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。
                //  Eureka-Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 构建一个最新的Lease信息
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 放入本地Map中
            gMap.put(registrant.getId(), lease);
            // 添加到最近的注册队列里面去,以时间戳作为Key, 名称作为value,主要是为了运维界面的统计数据。
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            // 分析instanceStatus
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
        
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        
            // If the lease is registered with UP status, set lease service up timestamp
            // 得到instanceStatus,判断是否是UP状态,
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            // 设置注册类型为添加
            registrant.setActionType(ActionType.ADDED);
            // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取、
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            // 清理缓存 ,传入的参数为key
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
    

    相关文章

      网友评论

          本文标题:深入 Eureka 服务注册 源码分析(二)

          本文链接:https://www.haomeiwen.com/subject/ltstbftx.html