美文网首页
剖析Eureka实现细节

剖析Eureka实现细节

作者: 一生逍遥一生 | 来源:发表于2019-10-17 13:34 被阅读0次

本文基于的spring-boot-starter-parent:2.1.8.RELEASE,spring-cloud-starter-netflix-eureka-server:2.1.3.RELEASE,spring-cloud-starter-netflix-eureka-client:2.1.3.RELEASE。
Eureka符合AP(可用性、容错性)理论。

Eureka注册中心

启动过程分析

使用@EnableEurekaServer来标记启动注册中心功能。

package org.springframework.cloud.netflix.eureka.server;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Import;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
  1. 注解@EnableDiscoveryClient

    Spring Cloud中discovery service有很多中实现(eureka、consul、zookeeper
    dubbo),@EnableDiscoveryClient是开启Spring Cloud 服务发现客户端的注解,没有说明是开启Eureka客户端,
    是因为开启开启Eureka客户端de注解是@EnableEurekaClient。@EnableDiscoveryClient
    是存在与spring-cloud-commons.jar,@EnableEurekaClient存在与spring-cloud-netflix-eureka-client.jar中
    如果使用dubbo作为服务发现,可以使用@EnableDubboClient注解。
  2. @Import(EurekaServerMarkerConfiguration.class)用于对激活的bean进行标记。
    EurekaServerAutoConfiguration分析(org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration):
    a.在EurekaServerMarkerConfiguration.Marker存在的前提下,才能执行这个类。
    b.通过EnableConfigurationProperties来导入配置类:
    EurekaDashboardProperties.java
属性名 含义 默认值
eureka.dashboard.path dashboard的路径 /
eureka.dashboard.enable 是否开启dashboard true

InstanceRegistryProperties.java

属性名 含义 默认值
eureka.server.expectedNumberOfRenewsPerMin 每分钟期望续约的数量 1
eureka.server.defaultOpenForTrafficCount 默认打开的通信数量 1

3.@Import(EurekaServerInitializerConfiguration.class):导入一个启动配置类
4.@PropertySource("classpath:/eureka/server.properties"):指定要导入配置的路径

接收Eureka客户端请求

接收注册信息

com.netflix.eureka.resources.ApplicationResource#addInstance

 /**
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     */
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        .....
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

入口的作用:1.接收eureka客户端的注册请求,完成服务实例向注册中心的注册;2.接收其他注册节点的同步信息,完成节点间服务列表的同步工作。

注册过程

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

/**
     * Registers the information about the {@link InstanceInfo} and replicates
     * this information to all peer eureka nodes. If this is replication event
     * from other replica nodes then it is not replicated.
     * @param info
     *            the {@link InstanceInfo} to be registered and replicated.
     * @param isReplication
     *            true if this is a replication event from other replica nodes,
     *            false otherwise.
     */
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

com.netflix.eureka.registry.AbstractInstanceRegistry#register

 /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
           ...省略代码...
        } finally {
            read.unlock();
        }
    }

在进行服务注册时,接口会在Service Provider启动时被调用来实现服务注册。
步骤解析:1.ApplicationResource类接收Http服务请求,调用PeerAwareInstanceRegistryImpl的方法;2.PeerAwareInstanceRegistryImpl完成服务注册后,
调用replicateToPeers向其他Eureka Server节点做状态同步。
在进行注册操作时,使用了ReentrantReadWriteLock,注册registry是一个ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>,
第一层ConcurrentHashMap的key是app name(应用名字),第二层hashmap的key是instanceinstance name(实例名字)。

同步到注册中心其他节点

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}
/**
 * Replicates all instance changes to peer eureka nodes except for
 * replication traffic to this node.
 */
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

接收到Service Provider请求的Eureka Server,把请求再次转发到其它的Eureka Server,调用同样的接口,传入同样的参数,除了会在header中标记isReplication=true,从而避免重复的replicate。Peer之间的状态是采用异步的方式同步的,所以不保证节点间的状态一定是一致的,不过基本能保证最终状态是一致的。

续约

com.netflix.eureka.resources.InstanceResource

/**
 * A put request for renewing lease from a client instance.
 */
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // Not found in the registry, immediately ask for a register
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat。主要是用来告诉Eureka Server Service Provider还活着,避免服务被剔除掉。首先更新自身状态,再同步到其它Peer。

Cancle

com.netflix.eureka.cluster.PeerEurekaNode

/**
 * Send the cancellation information of an instance to the node represented
 * by this class.
 */
public void cancel(final String appName, final String id) throws Exception {
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    batchingDispatcher.process(
            taskId("cancel", appName, id),
            new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.cancel(appName, id);
                }

                @Override
                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        logger.warn("{}: missing entry.", getTaskName());
                    }
                }
            },
            expiryTime
    );
}

Cancel(服务下线)一般在Service Provider shut down的时候调用,用来把自身的服务从Eureka Server中删除,以防客户端调用不存在的服务。

Fetch Registries

Fetch Registries由Service Consumer调用,用来获取Eureka Server上注册的服务。
为了提高性能,服务列表在Eureka Server会缓存一份,同时每30秒更新一次。

Eviction

Eviction(失效服务剔除)用来定期(默认为每60秒)在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。
默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。
失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置。

发现节点

Eureka Server在启动后会调用EurekaClientConfig.getEurekaServerServiceUrls来获取所有的Peer节点,并且会定期更新。
定期更新频率可以通过eureka.server.peerEurekaNodesUpdateIntervalMs配置。
这个方法的默认实现是从配置文件读取,所以如果Eureka Server节点相对固定的话,可以通过在配置文件中配置来实现。

Service Provider 实现细节

Register

Service Provider要对外提供服务,一个很重要的步骤就是把自己注册到Eureka Server上。
只需要在启动时和实例状态变化时调用Eureka Server的接口注册即可。需要注意的是,需要确保配置eureka.client.registerWithEureka=true。

Renew

Renew操作会在Service Provider端定期发起,用来通知Eureka Server自己还活着。
配置如下:
1.instance.leaseRenewalIntervalInSeconds
Renew频率。默认是30秒,也就是每30秒会向Eureka Server发起Renew操作。
2.instance.leaseExpirationDurationInSeconds
服务失效时间。默认是90秒,也就是如果Eureka Server在90秒内
没有接收到来自Service Provider的Renew操作,就会把Service Provider剔除。

Cancel

在Service Provider服务shut down的时候,需要及时通知Eureka Server把自己剔除,从而避免客户端调用已经下线的服务。逻辑本身比较简单,通过对方法标记@PreDestroy,从而在服务shut down的时候会被触发。

发现EurekaServer

默认从配置文件读取,如果需要更灵活的控制,可以通过override getEurekaServerServiceUrls方法来提供自己的实现。
定期更新频率可以通过eureka.client.eurekaServiceUrlPollIntervalSeconds配置。

Service Consumer实现细节

Fetch Service Registries

Service Consumer在启动时会从Eureka Server获取所有服务列表,并在本地缓存。需要注意的是,需要确保配置eureka.client.shouldFetchRegistry=true。

Update Service Registries

由于在本地有一份缓存,所以需要定期更新,定期更新频率可以通过eureka.client.registryFetchIntervalSeconds配置。

Eureka Server端多级缓存机制

Eureka Server为了避免同时读写内存数据结构造成的并发冲突问题,采用了多级缓存机制来进一步提升服务请求的响应速度。

拉取注册表的步骤:
1.首先从ReadOnlyCacheMap里查缓存的注册表。
2.若没有,就找ReadWriteCacheMap里缓存的注册表。
3.如果还没有,就从内存中获取实际的注册表数据。
注册表发生变更的步骤:
1.会在内存中更新变更的注册表数据,同时过期掉ReadWriteCacheMap。
2.此过程不会影响ReadOnlyCacheMap提供人家查询注册表。
3.一段时间内(默认30秒),各服务拉取注册表会直接读ReadOnlyCacheMap。
4.30秒过后,Eureka Server的后台线程发现ReadWriteCacheMap已经清空了,也会清空ReadOnlyCacheMap中的缓存。
5.下次有服务拉取注册表,又会从内存中获取最新的数据了,同时填充各个缓存。
多级缓存机制的优点是什么?
1.尽可能保证了内存注册表数据不会出现频繁的读写冲突问题。
3.并且进一步保证对Eureka Server的大量请求,都是快速从纯内存走,性能极高。

自保护

自保护的作用是防止出现网络分区期间服务器之间的注册表中的信息不一致。Eureka遵从AP理论。Eureka的自保护功能,当Eureka服务器没有收到超过特定阈值的心跳时,它们就会停止从注册表中终止实例。Eureka Client在进行注册的时候,会记录到第一个Eureka Server服务器上,然后Eureka之间进行传递数据。

相关配置:https://cloud.spring.io/spring-cloud-static/spring-cloud.html

eureka.instance.lease-renewal-interval-in-seconds = 30 # 租期更新间隔为30秒,3次心跳
eureka.instance.lease-expiration-duration-in-seconds = 90 #租期过期间隔为90秒
eureka.server.eviction-interval-timer-in-ms = 60 * 1000 # 1分钟不接受到心跳,将这个实例剔除
eureka.server.renewal-percent-threshold = 0.85 # 超过85%的服务器没有收到心跳,进入自保护环境
eureka.server.renewal-threshold-update-interval-ms = 15 * 60 * 1000 
eureka.server.enable-self-preservation = true

健康监控

获取所有服务实例信息:http://{ip}:{port}/eureka/apps
获取某一服务实例信息:http://{ip}:{port}/eureka/apps/{service-name}
修改某一实例的状态信息:http://{ip}:{port}/eureka/apps/{service-name}/{instance id}/status?value={status}
开启状态检查:eureka.client.healthcheck.enabled=true
健康检查需要配置的项:eureka.instance.status-page-url-path、eureka.instance.status-page-url、
eureka.instance.health-check-url、eureka.instance.health-check-url-path。

节点之间网络中断

可能会出现下面情况:
1.节点之间的心跳失败,节点进入自保护模式。
2.注册可能发生在孤立的服务器上,有些客户端可能会反映新的注册,而其他客户端则可能没有。

为什么不用HA 代理做负载均衡

1.粘性会话。大多数场景不需要粘性会话。如果有粘性会话,Eureka有弹性,HA proxy没有弹性。

为什么不用Zookeeper作为服务注册

Eureka功能:
1.注册表信息节点之间传递。
2.REST端点处理注册、续订、到期和取消。
3.能够抵挡客户端和服务器之间以及同级之间的网络中断。
4.使实例信息保持最新状态,以应对复杂的EIP绑定,部署回滚和弹性扩展。
Zookeeper功能:领导者选举、有序更新、分布式同步机器一致性。
Zookeeper不能处理EIP问题,Zookeeper失败的时候,需要借助外力来启动。
Eureka不需要依赖外部组件、大部分服务依赖Eureka自己、这样可以减少复杂性、避免其他故障点。

自定义健康检查

package com.edu.discoveryserver.health;

import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.stereotype.Component;
public class CustomHealthIndicator extends AbstractHealthIndicator{
    @Override
    protected void doHealthCheck(Builder builder) throws Exception {
        builder.up()
                .withDetail("details", "My custom health indicator");
    }
}

代码地址:https://github.com/yishengxiaoyao/spring-cloud-eureka-demo

参考文献

@EnableDiscoveryClient与@EnableEurekaClient区别
Microservices : Service Registration and Discover in Netflix Eureka
Spring Cloud Eureka 源码分析(一) 服务端启动过程
深度剖析服务发现组件Netflix Eureka
Eureka源码剖析
The Mystery of Eureka Health Monitoring
The Mystery of Eureka self-preservation
Spring Cloud中,Eureka常见问题总结
Eureka Clustering documentation and best practices
http://www.itmuch.com/spring-cloud/finchley-6/
为什么不应该使用ZooKeeper做服务发现
FAQ
Health Check API in Spring Boot Application
Spring Cloud实战小贴士:健康检查

相关文章

网友评论

      本文标题:剖析Eureka实现细节

      本文链接:https://www.haomeiwen.com/subject/atimmctx.html