美文网首页
微服务注册及发现

微服务注册及发现

作者: minute_5 | 来源:发表于2019-12-11 20:32 被阅读0次

    微服务注册及发现

    Netflix官方对于Eureka的架构图


    eurekaArchitecture.png

    这里简单解释一下这个图:

    1. 对于Eureka来说主要有两个角色:server, client
    2. client又分为两种
      1. service -- 提供服务
      2. client -- 消费服务
      3. 一个client既可以是消费者又可以是生产者
    3. 每个server之间组成集群 -- server之间进行相互注册
    4. 每个client向server注册,将自己的一些客户端信息发送Server,并通过心跳机制维持上线状态
    5. 图中的每一块区域(集群)都需要一个server提供注册服务
    6. client从server上拉取信息,缓存这些信息,然后根据这些信息去调用其他client进行消费
    • 这里先说说 Eureka 的自我保护模式:
      • 当一个新的Eureka Server出现时,它尝试从相邻节点获取所有实例注册表信息。如果获取信息出现问题,Eureka Serve会尝试其他的节点。如果服务器能够成功获取所有实例,则根据该信息设置应该接收的更新阈值。如果有任何时间,Eureka Serve接收到的续约低于为该值配置的百分比(默认为15分钟内低于85%),则服务器开启自我保护模式,即不再剔除注册列表的信息。这样做的好处就是,如果是Eureka Server自身的网络问题,导致Eureka Client的续约不上,Eureka Client的注册列表信息不再被删除,也就是Eureka Client还可以被其他服务消费。

    源码解析

    如何进行服务注册? -- register

    Client

    1. 启动后拉去服务端信息
    2. 开启心跳机制
    3. 发送自身信息到server注册
    dicoverClient关联类图
        private void initScheduledTasks() {
            // 使用任务调度获取注册表信息
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
            
            // 注册client到server
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // 开启心跳
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                // 注册client到server上
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
                ···省略代码
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
        
        // 细讲讲
        // InstanceInfoReplicator 实现 Runnable
        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    
        // 注册方法
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                // 通过http请求向Server注册
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
    
        // 服务端提供的注册接口
        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {    
            ···省略代码···
            // 这个register下面再说
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
    

    server

    1. 启动后拉去服务端信息
    2. 开启心跳机制
    3. 发送自身信息到server注册
        // BootStrapContext是eureka server的初始启动类,其中调用initEurekaServerContext
        protected void initEurekaServerContext() throws Exception {
        ···省略···
        PeerAwareInstanceRegistry registry;
                if (isAws(applicationInfoManager.getInfo())) {
                ...
                } else {
                    // 服务注册
                    registry = new PeerAwareInstanceRegistryImpl(
                            eurekaServerConfig,
                            eurekaClient.getEurekaClientConfig(),
                            serverCodecs,
                            eurekaClient
                    );
                }
    
                // 服务高可用
                PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                        registry,
                        eurekaServerConfig,
                        eurekaClient.getEurekaClientConfig(),
                        serverCodecs,
                        applicationInfoManager
                );
        }
    
        // PeerAwareInstanceRegistryImpl
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            // 通过父类AbstractInstanceRegistry进行注册
            // 其将注册列表信息放到Map<String, Lease<InstanceInfo>> gMap中
            // 修改instance状态
            super.register(info, leaseDuration, isReplication);
            // 同步到其他Eureka Server的其他Peers节点,遍历循环向所有的Peers节点注册
            // 该方法通过执行一个任务向其他节点同步该注册信息
            // 最终执行类PeerEurekaNodes的register()方法
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    
        // PeerEurekaNode
        public void register(final InstanceInfo info) throws Exception {
            long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
            batchingDispatcher.process(
                    taskId("register", info),
                    new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                        public EurekaHttpResponse<Void> execute() {
                            // 最终通过http注册
                            return replicationClient.register(info);
                        }
                    },
                    expiryTime
            );
        }
    

    如何进行服务续约? -- reNew

    Client

    上面提到了开启心跳线程 HeartbeatThread

        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    
        /**
         * Renew with the eureka service by making the appropriate REST call
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    

    server

        // 服务端刷新心跳接口
        @PUT
        public Response renewLease(
                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
                @QueryParam("overriddenstatus") String overriddenStatus,
                @QueryParam("status") String status,
                @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
            boolean isFromReplicaNode = "true".equals(isReplication);
            // 这里的renew就和上面服务端的register大同小异了
            boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
            ·······
        }
    

    服务信息获取、下线、剔除都和上面两种方式都类似了

    参考

    https://github.com/Netflix/eureka/wiki

    https://cloud.spring.io/spring-cloud-netflix/reference/html/

    https://www.fangzhipeng.com/springcloud/2017/08/11/eureka-resources.html

    相关文章

      网友评论

          本文标题:微服务注册及发现

          本文链接:https://www.haomeiwen.com/subject/sgdagctx.html