美文网首页
Eureka源码剖析之一:初始化-启动

Eureka源码剖析之一:初始化-启动

作者: 搬运工来架构 | 来源:发表于2020-01-16 21:38 被阅读0次

    Eureka启动的过程有client端和server端, Eureka client端入口是DiscoveryClient类, Eureka server端入口是EurekaBootStrap类, 接下来我们就从源码看下它们做了什么吧!

    〓Eureka Client端启动

    1)看下DiscoveryClient类图: 

    由此看出DiscoveryClient实现了EurekaClient、LookupService接口,并且定义了内部类:DiscoverClientOptionalArgs,可选参数类,源码里实现为空,是默认实现,具体的需要去查看AbstractDiscoveryClientOptionalArgs这个抽象类;EurekaTransport类,封装了Client请求的类;CacheFreshThread,刷新缓存线程,提供定时拉取服务列表等;HeartbeatThread,心跳线程,提供定时向服务端续约服务等。

    // DiscoveryClient类是一个单例,实现了EurekaClient接口

    @Singleton

    publicclassDiscoveryClientimplementsEurekaClient{

    // DiscoveryClient类的构造函数

    @Inject// 构造器注入

    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,

    Provider backupRegistryProvider) {

    if(args !=null) {

    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;

    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;

    this.eventListeners.addAll(args.getEventListeners());

    this.preRegistrationHandler = args.preRegistrationHandler;

    }else{

    this.healthCheckCallbackProvider =null;

    this.healthCheckHandlerProvider =null;

    this.preRegistrationHandler =null;

    }

    this.applicationInfoManager = applicationInfoManager;

    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;

    staticClientConfig = clientConfig;

    transportConfig = config.getTransportConfig();

    instanceInfo = myInfo;

    if(myInfo !=null) {

    appPathIdentifier = instanceInfo.getAppName() +"/"+ instanceInfo.getId();

    }else{

    logger.warn("Setting instanceInfo to a passed in null value");

    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer =newEndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);

    localRegionApps.set(newApplications());

    // 拉取服务计数器:单调地增加生成计数器,以确保陈旧的线程不会将注册表重置为旧版本。

    fetchRegistryGeneration =newAtomicLong(0);

    remoteRegionsToFetch =newAtomicReference(clientConfig.fetchRegistryForRemoteRegions());

    remoteRegionsRef =newAtomicReference<>(remoteRegionsToFetch.get() ==null?null: remoteRegionsToFetch.get().split(","));

    // 过时注册统计

    if(config.shouldFetchRegistry()) {

    this.registryStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX +"lastUpdateSec_",newlong[]{15L,30L,60L,120L,240L,480L});

    }else{

    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;

    }

    // 过时心跳统计

    if(config.shouldRegisterWithEureka()) {

    this.heartbeatStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX +"lastHeartbeatSec_",newlong[]{15L,30L,60L,120L,240L,480L});

    }else{

    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;

    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 既不需要注册到Eureka也不拉取注册服务

    if(!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {

    logger.info("Client configured to neither register nor query for data.");

    scheduler =null;

    heartbeatExecutor =null;

    cacheRefreshExecutor =null;

    eurekaTransport =null;

    instanceRegionChecker =newInstanceRegionChecker(newPropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()

    // to work with DI'd DiscoveryClient

    DiscoveryManager.getInstance().setDiscoveryClient(this);

    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();

    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

    initTimestampMs,this.getApplications().size());

    return;// no need to setup up an network tasks and we are done

    }

    try{

    // 定义了2个线程大小的定时线程池:一个是刷新缓存CacheFreshThread,一个是心跳线程HeartbeatThread

    scheduler = Executors.newScheduledThreadPool(2,

    newThreadFactoryBuilder()

    .setNameFormat("DiscoveryClient-%d")

    .setDaemon(true)

    .build());

    // 心跳线程池

    heartbeatExecutor =newThreadPoolExecutor(

    1, clientConfig.getHeartbeatExecutorThreadPoolSize(),0, TimeUnit.SECONDS,

    newSynchronousQueue(),

    newThreadFactoryBuilder()

    .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")

    .setDaemon(true)

    .build()

    );// use direct handoff

    // 刷新缓存线程池

    cacheRefreshExecutor =newThreadPoolExecutor(

    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(),0, TimeUnit.SECONDS,

    newSynchronousQueue(),

    newThreadFactoryBuilder()

    .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")

    .setDaemon(true)

    .build()

    );// use direct handoff

    // 实例化EurekaTransport

    eurekaTransport =newEurekaTransport();

    scheduleServerEndpointTask(eurekaTransport, args);

    AzToRegionMapper azToRegionMapper;

    if(clientConfig.shouldUseDnsForFetchingServiceUrls()) {

    azToRegionMapper =newDNSBasedAzToRegionMapper(clientConfig);

    }else{

    azToRegionMapper =newPropertyBasedAzToRegionMapper(clientConfig);

    }

    if(null!= remoteRegionsToFetch.get()) {

    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));

    }

    instanceRegionChecker =newInstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());

    }catch(Throwable e) {

    thrownewRuntimeException("Failed to initialize DiscoveryClient!", e);

    }

    // 在启动时需要拉取注册服务列表,增量拉取之后如果失败就会从备份里面再次拉取

    if(clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {

    fetchRegistryFromBackup();

    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started

    if(this.preRegistrationHandler !=null) {

    this.preRegistrationHandler.beforeRegistration();

    }

    // 初始化定时任务

    initScheduledTasks();

    try{

    // 监控DiscoverClient

    Monitors.registerObject(this);

    }catch(Throwable e) {

    logger.warn("Cannot register timers", e);

    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()

    // to work with DI'd DiscoveryClient

    DiscoveryManager.getInstance().setDiscoveryClient(this);

    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();

    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

    initTimestampMs,this.getApplications().size());

    }

    /**

    * Initializes all scheduled tasks.

    */

    privatevoidinitScheduledTasks(){

    // 需要拉取注册服务,则定时拉取刷新缓存CacheRefreshThread,可以看#Eureka服务拉取 一篇

    if(clientConfig.shouldFetchRegistry()) {

    // registry cache refresh timer

    // 默认30秒,定时拉取服务

    intregistryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

    intexpBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

    scheduler.schedule(

    newTimedSupervisorTask(

    "cacheRefresh",

    scheduler,

    cacheRefreshExecutor,

    registryFetchIntervalSeconds,

    TimeUnit.SECONDS,

    expBackOffBound,

    newCacheRefreshThread()

    ),

    registryFetchIntervalSeconds, TimeUnit.SECONDS);

    }

    // 需要注册到Eureka,则定时心跳请求服务端保持客户端存活,即服务续约。可以看#Eureka服务续约 一篇

    if(clientConfig.shouldRegisterWithEureka()) {

    // 默认30秒,定时进行服务续约

    intrenewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

    intexpBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

    logger.info("Starting heartbeat executor: "+"renew interval is: "+ renewalIntervalInSecs);

    // Heartbeat timer

    scheduler.schedule(

    newTimedSupervisorTask(

    "heartbeat",

    scheduler,

    heartbeatExecutor,

    renewalIntervalInSecs,

    TimeUnit.SECONDS,

    expBackOffBound,

    newHeartbeatThread()

    ),

    renewalIntervalInSecs, TimeUnit.SECONDS);

    // InstanceInfo replicator 当前实例节点复制器实例化

    instanceInfoReplicator =newInstanceInfoReplicator(

    this,

    instanceInfo,

    // 实例信息复制间隔,默认30秒

    clientConfig.getInstanceInfoReplicationIntervalSeconds(),

    2);// burstSize

    // 状态变化监听器

    statusChangeListener =newApplicationInfoManager.StatusChangeListener() {

    @Override

    publicStringgetId()

    {

    return"statusChangeListener";

    }

    @Override

    publicvoidnotify(StatusChangeEvent statusChangeEvent)

    {

    if(InstanceStatus.DOWN == statusChangeEvent.getStatus() ||

    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {

    // log at warn level if DOWN was involved

    logger.warn("Saw local status change event {}", statusChangeEvent);

    }else{

    logger.info("Saw local status change event {}", statusChangeEvent);

    }

    instanceInfoReplicator.onDemandUpdate();

    }

    };

    if(clientConfig.shouldOnDemandUpdateStatusChange()) {

    applicationInfoManager.registerStatusChangeListener(statusChangeListener);

    }

    // 实例节点复制器启动

    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

    }else{

    logger.info("Not registering with Eureka server per configuration");

    }

    }

    }

    classInstanceInfoReplicatorimplementsRunnable{

    // 启动,延迟40秒启动

    publicvoidstart(intinitialDelayMs){

    // CAS保证启动一次

    if(started.compareAndSet(false,true)) {

    instanceInfo.setIsDirty();// for initial register

    // 定时任务定时调用实例信息复制器(线程),逻辑看run方法

    Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);

    scheduledPeriodicRef.set(next);

    }

    }

    publicvoidrun(){

    try{

    // 刷新实例信息

    discoveryClient.refreshInstanceInfo();

    // 获取实例信息的脏时间戳,如果存在则进行服务注册,服务注册可以看 #Eureka服务注册 一篇

    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();

    if(dirtyTimestamp !=null) {

    //Client进行注册操作

    discoveryClient.register();

    instanceInfo.unsetIsDirty(dirtyTimestamp);

    }

    }catch(Throwable t) {

    logger.warn("There was a problem with the instance info replicator", t);

    }finally{

    // 定时每30秒进行刷新或注册请求

    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);

    scheduledPeriodicRef.set(next);

    }

    }

    }

    Eureka Client启动时会开启三个定时任务

    ①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

    〓Eureka Server端启动

    EurekaBootStrap是server端启动入口,

    PeerAwareInstanceRegistryImpl是真正的核心类,我们看下其类图:

    // 继承Servlet上下文监听器,说明Eureka Server是基于Servlet

    publicclassEurekaBootStrapimplementsServletContextListener{

    ...

    @Override

    publicvoidcontextInitialized(ServletContextEvent event){

    try{

    // 初始化环境

    initEurekaEnvironment();

    // 初始化Euraka Server Context

    initEurekaServerContext();

    ServletContext sc = event.getServletContext();

    sc.setAttribute(EurekaServerContext.class.getName(), serverContext);

    }catch(Throwable e) {

    logger.error("Cannot bootstrap eureka server :", e);

    thrownewRuntimeException("Cannot bootstrap eureka server :", e);

    }

    }

    /**

    * init hook for server context. Override for custom logic.

    */

    protectedvoidinitEurekaServerContext()throwsException{

    // Eureka Server读取配置

    EurekaServerConfig eurekaServerConfig =newDefaultEurekaServerConfig();

    // For backward compatibility

    JsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

    XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

    logger.info("Initializing the eureka client...");

    logger.info(eurekaServerConfig.getJsonCodecName());

    // Server默认加载编码JSON、XML

    ServerCodecs serverCodecs =newDefaultServerCodecs(eurekaServerConfig);

    ApplicationInfoManager applicationInfoManager =null;

    // EurekaClient初始化

    if(eurekaClient ==null) {

    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())

    ?newCloudInstanceConfig()

    :newMyDataCenterInstanceConfig();

    applicationInfoManager =newApplicationInfoManager(

    instanceConfig,newEurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig =newDefaultEurekaClientConfig();

    eurekaClient =newDiscoveryClient(applicationInfoManager, eurekaClientConfig);

    }else{

    applicationInfoManager = eurekaClient.getApplicationInfoManager();

    }

    // 如果是使用AWS平台,这里不涉及

    PeerAwareInstanceRegistry registry;

    if(isAws(applicationInfoManager.getInfo())) {

    registry =newAwsInstanceRegistry(

    eurekaServerConfig,

    eurekaClient.getEurekaClientConfig(),

    serverCodecs,

    eurekaClient

    );

    awsBinder =newAwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);

    awsBinder.start();

    }else{

    // 初始化集群实例,父类AbstractInstanceRegistry 

    registry =newPeerAwareInstanceRegistryImpl(

    eurekaServerConfig,

    eurekaClient.getEurekaClientConfig(),

    serverCodecs,

    eurekaClient

    );

    }

    // 初始化集群节点实例

    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(

    registry,

    eurekaServerConfig,

    eurekaClient.getEurekaClientConfig(),

    serverCodecs,

    applicationInfoManager

    );

    // 初始化EurekaSeverContext实例

    serverContext =newDefaultEurekaServerContext(

    eurekaServerConfig,

    serverCodecs,

    registry,

    peerEurekaNodes,

    applicationInfoManager

    );

    // 使用非注入的方式Holder保持EurekaServerContext实例

    EurekaServerContextHolder.initialize(serverContext);

    // EurekaServerContext初始化

    serverContext.initialize();

    logger.info("Initialized server context");

    // 复制注册列表到集群上的其它节点

    intregistryCount = registry.syncUp();

    registry.openForTraffic(applicationInfoManager, registryCount);

    // Register all monitoring statistics.

    EurekaMonitors.registerAllStats();

    }

    // 初始化获得集群节点

    protectedPeerEurekaNodesgetPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager){

    PeerEurekaNodes peerEurekaNodes =newPeerEurekaNodes(

    registry,

    eurekaServerConfig,

    eurekaClientConfig,

    serverCodecs,

    applicationInfoManager

    );

    returnpeerEurekaNodes;

    }

    ...

    }

    @Singleton

    publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry{

    /**

    * 从eureka节点peer封装注册信息,进行节点信息的注册同步,如果操作失败则会进入重试

    * Populates the registry information from a peer eureka node. This

    * operation fails over to other nodes until the list is exhausted if the

    * communication fails.

    */

    @Override

    publicintsyncUp(){

    // Copy entire entry from neighboring DS node

    intcount =0;

    // 默认最大5次进行同步注册

    for(inti =0; ((i < serverConfig.getRegistrySyncRetries()) && (count ==0)); i++) {

    if(i >0) {

    try{

    // 重试等待30秒

    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());

    }catch(InterruptedException e) {

    logger.warn("Interrupted during registry transfer..");

    break;

    }

    }

    Applications apps = eurekaClient.getApplications();

    for(Application app : apps.getRegisteredApplications()) {

    for(InstanceInfo instance : app.getInstances()) {

    try{

    if(isRegisterable(instance)) {

    // 节点注册

    register(instance, instance.getLeaseInfo().getDurationInSecs(),true);

    count++;

    }

    }catch(Throwable t) {

    logger.error("During DS init copy", t);

    }

    }

    }

    }

    returncount;

    }

    @Override

    publicvoidopenForTraffic(ApplicationInfoManager applicationInfoManager,intcount){

    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.

    // 每分钟服务续约的数量:每个节点服务续约每30秒一次,那么多个节点需要count*2次

    this.expectedNumberOfRenewsPerMin = count *2;

    // 服务续约最小百分阈值默认为0.85。即最小阈值为最小预约数乘以0.85

    this.numberOfRenewsPerMinThreshold =

    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());

    logger.info("Got "+ count +" instances from neighboring DS node");

    logger.info("Renew threshold is: "+ numberOfRenewsPerMinThreshold);

    this.startupTime = System.currentTimeMillis();

    if(count >0) {

    this.peerInstancesTransferEmptyOnStartup =false;

    }

    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();

    booleanisAws = Name.Amazon == selfName;

    if(isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {

    logger.info("Priming AWS connections for all replicas..");

    primeAwsReplicas(applicationInfoManager);

    }

    logger.info("Changing status to UP");

    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);

    // 调用父类的定时启动剔除任务定时启动剔除任务,一旦达到剔除条件则会调用服务下线接口,可以看 #Eureka服务下线 一篇

    super.postInit();

    }

    }

    // PeerAwareInstanceRegistryImpl父类

    publicabstractclassAbstractInstanceRegistryimplementsInstanceRegistry{

    ...

    /**

    * Create a new, empty instance registry.

    */

    protectedAbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs){

    this.serverConfig = serverConfig;

    this.clientConfig = clientConfig;

    this.serverCodecs = serverCodecs;

    // 近期下线队列recentCanceledQueue

    this.recentCanceledQueue =newCircularQueue>(1000);

    // 近期注册队列recentRegisteredQueue

    this.recentRegisteredQueue =newCircularQueue>(1000);

    // 近期1分钟的续约计量统计任务

    this.renewsLastMin =newMeasuredRate(1000*60*1);

    // 定时任务:定期清除近期变更队列

    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),

    serverConfig.getDeltaRetentionTimerIntervalInMs(),

    serverConfig.getDeltaRetentionTimerIntervalInMs());

    }

    privateTimerTaskgetDeltaRetentionTask(){

    returnnewTimerTask() {

    @Override

    publicvoidrun(){

    Iterator it = recentlyChangedQueue.iterator();

    while(it.hasNext()) {

    if(it.next().getLastUpdateTime() <

    System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {

    it.remove();

    }else{

    break;

    }

    }

    }

    };

    }

    ...

    // 定时启动剔除任务

    protectedvoidpostInit(){

    renewsLastMin.start();

    if(evictionTaskRef.get() !=null) {

    evictionTaskRef.get().cancel();

    }

    evictionTaskRef.set(newEvictionTask());

    evictionTimer.schedule(evictionTaskRef.get(),

    serverConfig.getEvictionIntervalTimerInMs(),

    serverConfig.getEvictionIntervalTimerInMs());

    }

    /**

    * Registers a new instance with a given duration.

    * 给定一个租期时间注册一个新的实例

    *@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)

    */

    publicvoidregister(InstanceInfo registrant,intleaseDuration,booleanisReplication){

    try{

    read.lock();

    // 通过appname获取实例注册数据

    Map> gMap = registry.get(registrant.getAppName());

    // 注册计数器+1

    REGISTER.increment(isReplication);

    // 如果实例对应数据不存在,则进行初始化

    if(gMap ==null) {

    finalConcurrentHashMap> gNewMap =newConcurrentHashMap>();

    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);

    if(gMap ==null) {

    gMap = gNewMap;

    }

    }

    // 通过实例id获得实例租约信息,如果租约信息存在,那么会比较LastDirtyTimestamp,如果租约信息大于传进来的实例的LastDirtyTimestamp,那么则直接将使用缓存中的注册实例

    Lease existingLease = gMap.get(registrant.getId());

    // Retain the last dirty timestamp without overwriting it, if there is already a lease

    if(existingLease !=null&& (existingLease.getHolder() !=null)) {

    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();

    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();

    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

    // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted

    // InstanceInfo instead of the server local copy.

    if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {

    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+

    " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");

    registrant = existingLease.getHolder();

    }

    }else{

    // 租约信息不存在则作为一个新的注册

    synchronized(lock) {

    if(this.expectedNumberOfRenewsPerMin >0) {

    // Since the client wants to cancel it, reduce the threshold

    // (1

    // for 30 seconds, 2 for a minute)

    this.expectedNumberOfRenewsPerMin =this.expectedNumberOfRenewsPerMin +2;

    this.numberOfRenewsPerMinThreshold =

    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());

    }

    }

    logger.debug("No previous lease information found; it is new registration");

    }

    // 根据注册者和租约时间,实例化租约实例化信息

    Lease lease =newLease(registrant, leaseDuration);

    if(existingLease !=null) {

    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());

    }

    gMap.put(registrant.getId(), lease);

    // 将注册者添加到最新注册队列

    synchronized(recentRegisteredQueue) {

    recentRegisteredQueue.add(newPair(

    System.currentTimeMillis(),

    registrant.getAppName() +"("+ registrant.getId() +")"));

    }

    // This is where the initial state transfer of overridden status happens

    // 注册者重写状态不为UNKNOWN,并且重写状态Map不包含实例,则将其put到Map中

    if(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {

    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "

    +"overrides", registrant.getOverriddenStatus(), registrant.getId());

    if(!overriddenInstanceStatusMap.containsKey(registrant.getId())) {

    logger.info("Not found overridden id {} and hence adding it", registrant.getId());

    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());

    }

    }

    InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());

    if(overriddenStatusFromMap !=null) {

    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);

    registrant.setOverriddenStatus(overriddenStatusFromMap);

    }

    // Set the status based on the overridden status rules

    InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);

    registrant.setStatusWithoutDirty(overriddenInstanceStatus);

    // If the lease is registered with UP status, set lease service up timestamp

    // 如果租约被注册后为UP状态,那么标记服务启动时间戳,而且只是首次才会进行设置

    if(InstanceStatus.UP.equals(registrant.getStatus())) {

    lease.serviceUp();

    }

    registrant.setActionType(ActionType.ADDED);

    // 将租约存放到近期变更队列

    recentlyChangedQueue.add(newRecentlyChangedItem(lease));

    registrant.setLastUpdatedTimestamp();

    // 使对应实例缓存失效

    invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

    logger.info("Registered instance {}/{} with status {} (replication={})",

    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);

    }finally{

    read.unlock();

    }

    }

    }

    @Singleton

    publicclassDefaultEurekaServerContextimplementsEurekaServerContext{

    ...

    @PostConstruct

    @Override

    publicvoidinitialize()throwsException{

    logger.info("Initializing ...");

    peerEurekaNodes.start();

    registry.init(peerEurekaNodes);

    logger.info("Initialized");

    }

    ...

    }

    // 集群节点:定时更新集群节点

    @Singleton

    publicclassPeerEurekaNodes{

    ...

    publicvoidstart(){

    taskExecutor = Executors.newSingleThreadScheduledExecutor(

    newThreadFactory() {

    @Override

    publicThreadnewThread(Runnable r){

    Thread thread =newThread(r,"Eureka-PeerNodesUpdater");

    thread.setDaemon(true);

    returnthread;

    }

    }

    );

    try{

    // 预先更新集群节点

    updatePeerEurekaNodes(resolvePeerUrls());

    Runnable peersUpdateTask =newRunnable() {

    @Override

    publicvoidrun(){

    try{

    updatePeerEurekaNodes(resolvePeerUrls());

    }catch(Throwable e) {

    logger.error("Cannot update the replica Nodes", e);

    }

    }

    };

    taskExecutor.scheduleWithFixedDelay(

    peersUpdateTask,

    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

    TimeUnit.MILLISECONDS

    );

    }catch(Exception e) {

    thrownewIllegalStateException(e);

    }

    for(PeerEurekaNode node : peerEurekaNodes) {

    logger.info("Replica node URL:  "+ node.getServiceUrl());

    }

    }

    /**

    * Given new set of replica URLs, destroy {@linkPeerEurekaNode}s no longer available, and

    * create new ones. 新增集群备份节点,移除不再可用的节点,并且创建新的node节点

    *

    *@paramnewPeerUrls peer node URLs; this collection should have local node's URL filtered out

    */

    protectedvoidupdatePeerEurekaNodes(List<String> newPeerUrls){

    // 参数newPeerUrls为空则不处理

    if(newPeerUrls.isEmpty()) {

    logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");

    return;

    }

    // 即将关闭的节点集合

    Set toShutdown =newHashSet<>(peerEurekaNodeUrls);

    // 因为新增的节点可能在即将关闭的节点集合内,所以先从中移除。

    toShutdown.removeAll(newPeerUrls);

    // 即将新增的节点集合

    Set toAdd =newHashSet<>(newPeerUrls);

    // 从新增节点集合中移除本地节点集合

    toAdd.removeAll(peerEurekaNodeUrls);

    // 如果即将关闭的节点集合为空并且即将新增的节点也为空,则说明没有变化,不需要处理,立即返回。

    if(toShutdown.isEmpty() && toAdd.isEmpty()) {// No change

    return;

    }

    // 移除不再可能的节点,当前包含全部已有节点

    List newNodeList =newArrayList<>(peerEurekaNodes);

    // 即将关闭的节点集合不为空,则将当前节点集合中移除对应节点

    if(!toShutdown.isEmpty()) {

    logger.info("Removing no longer available peer nodes {}", toShutdown);

    inti =0;

    while(i < newNodeList.size()) {

    PeerEurekaNode eurekaNode = newNodeList.get(i);

    if(toShutdown.contains(eurekaNode.getServiceUrl())) {

    // 移除节点,节点进行关闭shutdown操作

    newNodeList.remove(i);

    eurekaNode.shutDown();

    }else{

    i++;

    }

    }

    }

    // 新增节点

    if(!toAdd.isEmpty()) {

    logger.info("Adding new peer nodes {}", toAdd);

    for(String peerUrl : toAdd) {

    newNodeList.add(createPeerEurekaNode(peerUrl));

    }

    }

    // 重新赋值

    this.peerEurekaNodes = newNodeList;

    this.peerEurekaNodeUrls =newHashSet<>(newPeerUrls);

    }

    // 实时获取排除自身节点所剩下集群的节点地址

    protectedListresolvePeerUrls(){

    InstanceInfo myInfo = applicationInfoManager.getInfo();

    String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);

    List replicaUrls = EndpointUtils

    .getDiscoveryServiceUrls(clientConfig, zone,newEndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

    intidx =0;

    while(idx < replicaUrls.size()) {

    if(isThisMyUrl(replicaUrls.get(idx))) {

    replicaUrls.remove(idx);

    }else{

    idx++;

    }

    }

    returnreplicaUrls;

    }

    ...

    }

    1)Eureka Server端启动入口类是继承ServletContextListener的EurekaBoostrap,首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

    2)调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

    3)集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。初始化服务续约最小期望数量和最小续约阈值。

    4)服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

    5)启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

     总 结 

    〓Eureka client

    启动核心类DiscoveryClient,启动时会开启三个定时任务:

    ①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

    〓Eureka Server

    启动入口类是EurekaBootStrap,核心类是PeerAwareInstanceRegistryImpl,

    初始化:首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

    更新节点信息定时任务、定时更新续约数据:调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存initializedResponseCache初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

    集群同步:集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。

    初始化服务续约最小期望数量和最小续约阈值。

    本地缓存数据:服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

    剔除任务:启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

    这里只是针对Eureka启动初始化做了简要剖析,更多详细的篇章请看后面的分析。

    PS:若哪里写的有误或者不明白的,请多多指教!

    -关注搬运工来架构,与优秀的你一同进步-

    相关文章

      网友评论

          本文标题:Eureka源码剖析之一:初始化-启动

          本文链接:https://www.haomeiwen.com/subject/oheqzctx.html