美文网首页
Spring cloud euraka

Spring cloud euraka

作者: 奋斗的韭菜汪 | 来源:发表于2020-07-24 18:01 被阅读0次

    常用的服务注册中心:euraka、Nacos、Zookeeper、Etcd、Consul
    比较注册中心差异从这些点出发:
    1、服务动态感知实现方式(push/pull),2、存储(是否支持持久化),3、高可用机制(集群特性:是否支持选主、一致性问题),4cap特性(cp、ap)
    ,5api的提供形式(http协议、netty通信)
    延伸:cap解释
    c(强一致性)
    a(可用性)
    p(分区容器性)(要满足这一点必须是集群节点/跨区域高可用)
    为什么只能满足cp或者ap
    在满足p的情况下,例如集群节点,比如一个请求,为了实现强一致性,则必须让请求等待,等待情况下,则不满足可用性,反之同理说明ap。
    Euraka:非持久化存储基于内存,(ap)最终一致性、集群节点角色是一样的
    Nacos:一致性raft算法(redis-sentinael/nacos选举)long-poll
    Zookeeper:zab协议 push
    Consul:一致性raft算法(redis-sentinael/nacos选举)、long-poll
    Etcd:一致性raft算法(redis-sentinael/nacos选举)long-poll
    服务注册中心的作用:1、服务上下线动态感知(根据心跳的结果剔除服务) 2、服务调用维护方便
    消费者服务获取euraka服务列表的方式:euraka注册中心push主动推送,pull定时轮询拉取(存在延迟)
    延伸:long-pull(长轮询):客户端发送请求到服务端,服务端hold住请求,等待,直到需要响应时才返回响应

    Eureka Server
    Eureka Client
    Eureka自我保护机制
    心跳失败的比例15分钟之内,高于15%的节点,Eureka任务这个服务出现了故障,直接剔除这个问题服务;
    这样做的好处:避免网络抖动或者网络不稳定的情况下,误删了服务
    Eureka自我保护机制的实现原理
    AbstractInstanceRegistry
    protected volatile int numberOfRenewsPerMinThreshold; //每一分钟最小的续约数量
    protected volatile int expectedNumberOfClientsSendingRenews; //预期每分钟收到的续约的客户端数量

    配置eureka心跳失败服务不可用评判标准(默认为0.85)

    eureka.server.renewal-percent-threshold: 0.5

    源码分析的核心功能
    1、Eureka Server如何接收请求?
    server端必须实现监听,可以是http+容器(tomcat,jobss)的实现方式,也可以是socket(NIO,Netty)的方式
    Eureka Server服务访问的类ApplicationsResource/ApplicationResource(类似于controller角色),使用的jersey框架
    Jersey是一个RESTFUL请求服务JAVA框架
    2、Eureka Client如何注册?
    核心方法EurekaServiceRegistry->register
    spring的SmartLifecycle接口,实现这个接口重写start方法(可以在spring容器启动后执行start方法内容)
    DisCoveryClient.register -> AbstractJerseyEurekaHttpClient.register

        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

    客户端发送的请求最终会到eureka服务端 ApplicationResource.addInstance内,新增注册实例
    3、Eureka Server如何存储服务地址?
    请求到达服务端ApplicationResource.addInstance内,会调用

        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
           //先注册到一个服务节点
            super.register(info, leaseDuration, isReplication);
            //再复制到其他节点完成完成注册
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    

    调用父类的register方法中可以看到服务实例信息InstanceInfo会缓存在ConcurrentHashMap中
    Eureka三级缓存(实现读写分离,解决竞争提高性能)


    image.png

    服务端节点间地址复制(通过调用node节点的register完成复制)

        private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            } finally {
                CurrentRequestVersion.remove();
            }
        }
    

    4、Eureka Client如何查询地址?
    (1)DiscoveryClient构造方法中会触发
    (2)每三十秒执行一次的定时任务DiscoveryClient中cacheRefreshTask
    cacheRefreshTask中的衰减任务实现:第一次是30s执行,若失败,第二次60s重试,若再失败180秒重试

            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    

    TimedSupervisorTask

        @Override
        public void run() {
            Future<?> future = null;
            try {
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();
            } catch (TimeoutException e) {
                logger.warn("task supervisor timed out", e);
                timeoutCounter.increment();
    
                long currentDelay = delay.get();
                //延时时间重计算
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                delay.compareAndSet(currentDelay, newDelay);
    
            } catch (RejectedExecutionException e) {
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, reject the task", e);
                } else {
                    logger.warn("task supervisor rejected the task", e);
                }
    
                rejectedCounter.increment();
            } catch (Throwable e) {
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, can't accept the task");
                } else {
                    logger.warn("task supervisor threw an exception", e);
                }
    
                throwableCounter.increment();
            } finally {
                if (future != null) {
                    future.cancel(true);
                }
    
                if (!scheduler.isShutdown()) {
                   //执行定时任务
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:Spring cloud euraka

          本文链接:https://www.haomeiwen.com/subject/wqmjlktx.html