美文网首页
Eureka源码分析(二) DiscoveryClient

Eureka源码分析(二) DiscoveryClient

作者: skyguard | 来源:发表于2018-11-09 09:51 被阅读0次

    之前我们分析了eureka client的配置,那么现在我们再来分析一下EurekaClient。DiscoveryClient,实现 EurekaClient 接口,用于与 Eureka-Server 交互。实现如下方法:

    向 Eureka-Server 注册自身服务
    向 Eureka-Server 续约自身服务
    向 Eureka-Server 取消自身服务,当关闭时
    从 Eureka-Server 查询应用集合和应用实例信息

    我们来看一下DiscoveryClient的构造方法

    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        // 【3.2.1】赋值 AbstractDiscoveryClientOptionalArgs
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
    
        // 【3.2.2】赋值 ApplicationInfoManager、EurekaClientConfig
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
    
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); // 无实际业务用途,用于打 logger
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
    
        // 【3.2.3】赋值 BackupRegistry
        this.backupRegistryProvider = backupRegistryProvider;
    
        // 【3.2.4】初始化 InstanceInfoBasedUrlRandomizer TODO 芋艿:什么用途
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    
        // 【3.2.5】初始化应用集合在本地的缓存
        localRegionApps.set(new Applications());
    
        fetchRegistryGeneration = new AtomicLong(0);
    
        // 【3.2.6】获取哪些 Region 集合的注册信息
        remoteRegionsToFetch = new AtomicReference<>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
        // 【3.2.7】初始化拉取、心跳的监控
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
        // 【3.2.8】结束初始化,当无需和 Eureka-Server 交互
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
    
            return;  // no need to setup up an network tasks and we are done
        }
    
        try {
            // 【3.2.9】初始化线程池
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
    
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            // 【3.2.10】初始化 Eureka 网络通信相关
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
    
            // 【3.2.11】初始化 InstanceRegionChecker
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
    
        // 【3.2.12】从 Eureka-Server 拉取注册信息
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
        // 【3.2.13】执行向 Eureka-Server 注册之前的处理器
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
    
        // 【3.2.14】初始化定时任务
        initScheduledTasks();
    
        // 【3.2.15】向 Servo 注册监控
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
    
        // 【3.2.16】初始化完成
        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }
    

    在DiscoveryClient的构造方法里完成了整个初始化的过程。BackupRegistry,备份注册中心接口。当 Eureka-Client 启动时,无法从 Eureka-Server 读取注册信息,从备份注册中心读取注册信息。

    public class NotImplementedRegistryImpl implements BackupRegistry {
    
    @Override
    public Applications fetchRegistry() {
        return null;
    }
    
    @Override
    public Applications fetchRegistry(String[] includeRemoteRegions) {
        return null;
    }
    

    }
    从 NotImplementedRegistryImpl可以看出,目前 Eureka-Client 未提供合适的默认实现。
    TransportClientFactories,生成 Jersey 客户端工厂的工厂接口。目前有 Jersey1TransportClientFactories 、Jersey2TransportClientFactories 两个实现。TransportClientFactories有一个newTransportClientFactory方法

    TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig,
                                                     final Collection<F> additionalFilters,
                                                     final InstanceInfo myInstanceInfo);
    

    通过该方法生产TransportClientFactory。
    EurekaEventListener,Eureka 事件监听器。代码为

    public interface EurekaEventListener {
    /**
     * Notification of an event within the {@link EurekaClient}.  
     * 
     * {@link EurekaEventListener#onEvent} is called from the context of an internal eureka thread 
     * and must therefore return as quickly as possible without blocking.
     * 
     * @param event
     */
    void onEvent(EurekaEvent event);
    

    }
    我们来看一下DiscoveryClient初始化定时任务的方法

    private void initScheduledTasks() {
        // 从 Eureka-Server 拉取注册信息执行器
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    
        // 向 Eureka-Server 心跳(续租)执行器
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); // 系数
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
    
            // 创建 应用实例信息复制器
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
    
            // 创建 应用实例状态变更监听器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
    
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
    
            // 注册 应用实例状态变更监听器
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
    
            // 开启 应用实例信息复制器
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    向 Servo 注册监控

    try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
    

    初始化完成

    DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    

    DiscoveryClient的分析就到这里了。

    相关文章

      网友评论

          本文标题:Eureka源码分析(二) DiscoveryClient

          本文链接:https://www.haomeiwen.com/subject/ysdwxqtx.html