美文网首页
(六) 如何获得动态的服务列表(ServerList)

(六) 如何获得动态的服务列表(ServerList)

作者: guessguess | 来源:发表于2021-11-16 20:06 被阅读0次

    前面的一些内容只讲到了,路由规则如何与处理器生成映射,最后处理的流程。但是里面对于服务的列表是如何动态生成的内容并没有关注到,所以带着疑问翻阅了一下源码。

    2.涉及到的类

    • 2.1 Server


      Server类结构图

      该类总共有2个实现类,分别为DiscoveryEnabledServer,DomainExtractingServer。

    • 2.1.1DiscoveryEnabledServer
      DiscoveryEnabledServer的作用说白了就是将实例信息转化为Server
      源码如下
    public class DiscoveryEnabledServer extends Server{
    
        private final InstanceInfo instanceInfo;
        private final MetaInfo serviceInfo;
    
        public DiscoveryEnabledServer(final InstanceInfo instanceInfo, boolean useSecurePort) {
            this(instanceInfo, useSecurePort, false);
        }
    
        public DiscoveryEnabledServer(final InstanceInfo instanceInfo, boolean useSecurePort, boolean useIpAddr) {
            super(useIpAddr ? instanceInfo.getIPAddr() : instanceInfo.getHostName(), instanceInfo.getPort());
            if(useSecurePort && instanceInfo.isPortEnabled(PortType.SECURE))
                super.setPort(instanceInfo.getSecurePort());
            this.instanceInfo = instanceInfo;
            this.serviceInfo = new MetaInfo() {
                @Override
                public String getAppName() {
                    return instanceInfo.getAppName();
                }
    
                @Override
                public String getServerGroup() {
                    return instanceInfo.getASGName();
                }
    
                @Override
                public String getServiceIdForDiscovery() {
                    return instanceInfo.getVIPAddress();
                }
    
                @Override
                public String getInstanceId() {
                    return instanceInfo.getId();
                }
            };
        }
        
        public InstanceInfo getInstanceInfo() {
            return instanceInfo;
        }
    
        @Override
        public MetaInfo getMetaInfo() {
            return serviceInfo;
        }
    }
    
    • 2.1.2 DomainExtractingServer
      DomainExtractingServer的作用其实就是按照实例信息中的分区,给实例进行分区。
      源码如下
    class DomainExtractingServer extends DiscoveryEnabledServer {
    
        private String id;
    
        @Override
        public String getId() {
            return id;
        }
    
        @Override
        public void setId(String id) {
            this.id = id;
        }
    
        DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
                boolean useIpAddr, boolean approximateZoneFromHostname) {
            // host and port are set in super()
            super(server.getInstanceInfo(), useSecurePort, useIpAddr);
            if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
                setZone(server.getInstanceInfo().getMetadata().get("zone"));
            }
            else if (approximateZoneFromHostname) {
                setZone(ZoneUtils.extractApproximateZone(server.getHost()));
            }
            else {
                setZone(server.getZone());
            }
            setId(extractId(server));
            setAlive(server.isAlive());
            setReadyToServe(server.isReadyToServe());
        }
    
        private String extractId(Server server) {
            if (server instanceof DiscoveryEnabledServer) {
                DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
                InstanceInfo instance = enabled.getInstanceInfo();
                if (instance.getMetadata().containsKey("instanceId")) {
                    return instance.getHostName() + ":"
                            + instance.getMetadata().get("instanceId");
                }
            }
            return super.getId();
        }
    
    }
    
    
    • 2.2 ServerList
      ServerList这个类接口设计也比较简单。具体作用的话,说白了就是用于获取服务列表。


      ServerList结构图
    • 2.2.1AbstractServerList
      其实该类的主要作用是提供了对服务的过滤,通过getFilterImpl方法加载ServerListFilter。最后过滤器肯定是用于对服务的过滤
      源码如下
    public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {   
        public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
            try {
                String niwsServerListFilterClassName = niwsClientConfig
                        .getProperty(
                                CommonClientConfigKey.NIWSServerListFilterClassName,
                                ZoneAffinityServerListFilter.class.getName())
                        .toString();
    
                AbstractServerListFilter<T> abstractNIWSServerListFilter = 
                        (AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
                return abstractNIWSServerListFilter;
            } catch (Throwable e) {
                throw new ClientException(
                        ClientException.ErrorType.CONFIGURATION,
                        "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
                                + niwsClientConfig
                                        .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
            }
        }
    }
    
    • 2.2.2ConfigurationBasedServerList
      从配置中加载出服务列表
    • 2.2.3 DiscoveryEnabledNIWSServerList
      通过注册中心的客户端,从注册中心获取服务列表
      源码如下
    public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
        @Override
        public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
            return obtainServersViaDiscovery();
        }
    
        private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
            List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    
            if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
                logger.warn("EurekaClient has not been initialized yet, returning an empty list");
                return new ArrayList<DiscoveryEnabledServer>();
            }
    
            EurekaClient eurekaClient = eurekaClientProvider.get();
            if (vipAddresses!=null){
                for (String vipAddress : vipAddresses.split(",")) {
                    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                    for (InstanceInfo ii : listOfInstanceInfo) {
                        if (ii.getStatus().equals(InstanceStatus.UP)) {
    
                            if(shouldUseOverridePort){
                                if(logger.isDebugEnabled()){
                                    logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                                }
                                InstanceInfo copy = new InstanceInfo(ii);
    
                                if(isSecure){
                                    ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                                }else{
                                    ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                                }
                            }
    
                            DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                            serverList.add(des);
                        }
                    }
                    if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                        break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                    }
                }
            }
            return serverList;
        }
    }
    
    • 2.2.4 DomainExtractingServerList
      将通过注册中心发现的服务元数据,进行分区,转化成DomainExtractingServer
      源码如下
    public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
    
        private ServerList<DiscoveryEnabledServer> list;
    
        private final RibbonProperties ribbon;
    
        private boolean approximateZoneFromHostname;
    
        public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
                IClientConfig clientConfig, boolean approximateZoneFromHostname) {
            this.list = list;
            this.ribbon = RibbonProperties.from(clientConfig);
            this.approximateZoneFromHostname = approximateZoneFromHostname;
        }
    
        @Override
        public List<DiscoveryEnabledServer> getInitialListOfServers() {
            List<DiscoveryEnabledServer> servers = setZones(
                    this.list.getInitialListOfServers());
            return servers;
        }
    
        @Override
        public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
            List<DiscoveryEnabledServer> servers = setZones(
                    this.list.getUpdatedListOfServers());
            return servers;
        }
    
        private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
            List<DiscoveryEnabledServer> result = new ArrayList<>();
            boolean isSecure = this.ribbon.isSecure(true);
            boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
            for (DiscoveryEnabledServer server : servers) {
                result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                        this.approximateZoneFromHostname));
            }
            return result;
        }
    
    }
    
    • 2.2.5StaticServerList
      静态的服务列表,源码如下
    public class StaticServerList<T extends Server> implements ServerList<T> {
    
        private final List<T> servers;
    
        public StaticServerList(T... servers) {
            this.servers = Arrays.asList(servers);
        }
    
        @Override
        public List<T> getInitialListOfServers() {
            return servers;
        }
    
        @Override
        public List<T> getUpdatedListOfServers() {
            return servers;
        }
    }
    
    • 2.3 ServerListUpdater
      这个类主要是用于控制服务列表更新的调度。比如多久更新一次服务列表。而服务更新的实现,当然是ServerList中已经实现了,所以这个类主要是用于控制调度。


      ServerListUpdater结构图
    • 2.3.1 PollingServerListUpdater
      默认的服务列表更新器
    • 2.3.2 EurekaNotificationServerListUpdater
      通过EurekaEvent触发的服务列表更新器

    3.动态的服务列表的实现原理

    首先zuul组件是结合ribbon来实现服务的动态拉取。
    其实通过源码也很容易可以得知。

    • 3.1 ServerListUpdater的注入
      首先来到配置类
    @Configuration
    @EnableConfigurationProperties
    @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
            RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
    public class RibbonClientConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
            return new PollingServerListUpdater(config);
        }
    }
    

    ServerListUpdater 是通过注入容器的方式,从而进行初始化。

    • 3.2 PollingServerListUpdater的工作原理
      从3.1中可以得知,对应的类型为PollingServerListUpdater,所以下面会关注一下PollingServerListUpdater的实现原理。

    先来看看PollingServerListUpdater的相关成员变量

    public class PollingServerListUpdater implements ServerListUpdater {
        第一次拉取服务列表,延时的时长,1秒钟(默认值)
        private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
        更新服务列表的时长,30秒(默认值)
        private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
        任务是否已经启动
        private final AtomicBoolean isActive = new AtomicBoolean(false);
        上次更新的时间
        private volatile long lastUpdated = System.currentTimeMillis();
        任务执行的延时时间
        private final long initialDelayMs;
        任务执行的间隔
        private final long refreshIntervalMs;
        任务的回调---为什么用volatile
        private volatile ScheduledFuture<?> scheduledFuture;
    }
    

    从上面看,其实许多内容都是定时器相关。

    • 3.2.1 线程池初始化
    public class PollingServerListUpdater implements ServerListUpdater {
        private static class LazyHolder {
            private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
            private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
            private static Thread _shutdownThread;
    
            static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
    
            static {
                int coreSize = poolSizeProp.get();
                线程工厂---线程池所使用的线程均为守护线程,当没有用户线程存活的时候,该线程池对应的线程就会被回收。
                对于web服务而言,最底层的用户线程应该就是处理请求的线程池。使用守护线程可以减少资源的浪费。
                ThreadFactory factory = (new ThreadFactoryBuilder())
                        .setNameFormat("PollingServerListUpdater-%d")
                        .setDaemon(true)
                        .build();
                线程池初始化
                _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
                poolSizeProp.addCallback(new Runnable() {
                    @Override
                    public void run() {
                        _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
                    }
    
                });
                _shutdownThread = new Thread(new Runnable() {
                    public void run() {
                        logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
                        shutdownExecutorPool();
                    }
                });
                jvm退出时,优雅地去关闭线程池,避免当线程池中还有任务时直接强行关闭,造成任务丢失。平滑退出
                Runtime.getRuntime().addShutdownHook(_shutdownThread);
            }
    
            private static void shutdownExecutorPool() {
                if (_serverListRefreshExecutor != null) {
                    _serverListRefreshExecutor.shutdown();
    
                    if (_shutdownThread != null) {
                        try {
                            Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                        } catch (IllegalStateException ise) { 
                        }
                    }
    
                }
            }
        }
    
    }
    
    • 3.2.2 PollingServerListUpdater的启动
    • 3.2.2.1启动时机
    @SuppressWarnings("deprecation")
    @Configuration
    @EnableConfigurationProperties
    @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
            RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
    public class RibbonClientConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
                    ZoneAwareLoadBalancer是DynamicServerListLoadBalancer的子类,所以会走到父类的构造方法
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
    }
    
    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                             ServerList<T> serverList, ServerListFilter<T> filter,
                                             ServerListUpdater serverListUpdater) {
            。。。省略部分代码
            restOfInit(clientConfig);
        }
    
        void restOfInit(IClientConfig clientConfig) {
            。。。省略代码
            enableAndInitLearnNewServersFeature();
            。。。省略代码
        }
    
        public void enableAndInitLearnNewServersFeature() {
            由于serverListUpdater是从容器中获取的,由前面得知,注入的类为PollingServerListUpdater
            serverListUpdater.start(updateAction);
        }
    }
    
    • 3.2.2.2 启动方法
    public class PollingServerListUpdater implements ServerListUpdater {
        @Override
        public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
                final Runnable wrapperRunnable = new Runnable() {
                    @Override
                    public void run() {
                        if (!isActive.get()) {
                            if (scheduledFuture != null) {
                                scheduledFuture.cancel(true);
                            }
                            return;
                        }
                        try {
                            updateAction.doUpdate();
                            lastUpdated = System.currentTimeMillis();
                        } catch (Exception e) {
                            logger.warn("Failed one update cycle", e);
                        }
                    }
                };
    
                scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                        wrapperRunnable,
                        initialDelayMs,
                        refreshIntervalMs,
                        TimeUnit.MILLISECONDS
                );
            } else {
                logger.info("Already active, no-op");
            }
        }
    }
    
    • 3.2.3 PollingServerListUpdater的工作内容
    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        volatile ServerListFilter<T> filter;
        volatile ServerList<T> serverListImpl;
    
        protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
            @Override
            public void doUpdate() {
                更新服务列表
                updateListOfServers();
            }
        };
    
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                通过ServerList获取服务列表。对于ServerList的实现类为DomainExtractingServerList
                servers = serverListImpl.getUpdatedListOfServers();
                通过ServerListFilter对服务进行过滤
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                }
            }
            updateAllServerList(servers);
        }
    }
    

    从上面得知,最后是通过DomainExtractingServerList来获取服务列表
    那么DomainExtractingServerList是如何获取服务列表的

    @Configuration
    public class EurekaRibbonClientConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public ServerList<?> ribbonServerList(IClientConfig config,
                Provider<EurekaClient> eurekaClientProvider) {
            if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
                return this.propertiesFactory.get(ServerList.class, config, serviceId);
            }
            DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                    config, eurekaClientProvider);
            DomainExtractingServerList serverList = new DomainExtractingServerList(
                    discoveryServerList, config, this.approximateZoneFromHostname);
            return serverList;
        }
    }
    
    public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
            从构造方法的实现可知,list为DiscoveryEnabledNIWSServerList,即通过eurekaclient来获取服务列表。
        private ServerList<DiscoveryEnabledServer> list;
        @Override
        public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
            List<DiscoveryEnabledServer> servers = setZones(
                    this.list.getUpdatedListOfServers());
            return servers;
        }
    }
    
    流程图

    相关文章

      网友评论

          本文标题:(六) 如何获得动态的服务列表(ServerList)

          本文链接:https://www.haomeiwen.com/subject/pesqtrtx.html