美文网首页注册中心-Nacos
nacos源码1-配置中心-客户端

nacos源码1-配置中心-客户端

作者: modou1618 | 来源:发表于2019-01-05 11:33 被阅读0次

    nacos官方主页

    一 结构

    结构.png

    二 ConfigFactory

    • 通过ConfigFactory方法,创建NacosConfigService,作为配置管理的入口对象。
    • 一种是通过自定义配置kv方式指定属性,另一种指定配置服务端地址方式。
    public class ConfigFactory {
        public static ConfigService createConfigService(Properties properties) throws NacosException {
            ...
        }
    
        public static ConfigService createConfigService(String serverAddr) throws NacosException {
            ...
        }
    }
    

    三 NacosConfigService

    3.1 配置管理

    3.1.1 配置发布,删除

    接口文档配置请求参数,调用ServerHttpAgent接口

    3.1.2 配置获取

    • 优先级最高使用failvoer配置
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    
    • 次之从配置服务端获取配置
      clientWorker.getServerConfig(dataId, group, tenant, timeoutMs)
    • 最低优先级获取本地snapshot配置
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant)
    

    3.2 配置变更监听管理

    调用ClientWork的配置变更监听添加删除接口

    四 ClientWork

    4.1 获取配置接口

    调用ServerHttpAgent接口获取配置,获取成功则保存或删除本地snapshot配置。

    HttpResult result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    switch (result.code) {
                case HttpURLConnection.HTTP_OK:
                    LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                    return result.content;
                case HttpURLConnection.HTTP_NOT_FOUND:
                    LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                    return null;
    ...
    }
    

    4.2 配置变更监听存储

    • 存储监听变更的配置集
      AtomicReference<Map<String/*group key*/, CacheData>> cacheMap
    • group key生成函数
    static public String getKeyTenant(String dataId, String group, String tenant) {
            StringBuilder sb = new StringBuilder();
            urlEncode(dataId, sb);
            sb.append('+');
            urlEncode(group, sb);
            if (StringUtils.isNotEmpty(tenant)) {
                sb.append('+');
                urlEncode(tenant, sb);
            }
            return sb.toString();
        }
    

    4.3 配置变更监听处理

    4.3.1 定时任务

    • 初始化时,创建10ms定时任务,启动LongPullingRunnable任务
    • 每个LongPullingRunnable任务默认处理3000个监听配置集
    • 根据总监听数量计算需要启动的LongPullingRunnable任务数
    • currentLongingTaskCount保存已启动的LongPullingRunnable任务数
    • 根据需要启动新的LongPullingRunnable任务数。
    executor.scheduleWithFixedDelay(new Runnable() {
                public void run() {
                    try {
                        checkConfigInfo();
                    } catch (Throwable e) {
                        log.error(agent.getName(), "NACOS-XXXX", "[sub-check] rotate check error", e);
                    }
                }
            }, 1L, 10L, TimeUnit.MILLISECONDS);
    
    public void checkConfigInfo() {
            // 分任务
            int listenerSize = cacheMap.get().size();
            // 向上取整为批数
            int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
            if (longingTaskCount > currentLongingTaskCount) {
                for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                    // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                    executorService.execute(new LongPullingRunnable(i));
                }
                currentLongingTaskCount = longingTaskCount;
            }
        }
    

    4.3.2 LongPullingRunnable

    4.3.2.1 failover配置变更监听

    // check failover config
                    for (CacheData cacheData : cacheMap.get().values()) {
                        if (cacheData.getTaskId() == taskId) {
                            cacheDatas.add(cacheData);
                            try {
                                checkLocalConfig(cacheData);
                                if (cacheData.isUseLocalConfigInfo()) {
                                    cacheData.checkListenerMd5();
                                }
                            } catch (Exception e) {
                                log.error("NACOS-CLIENT", "get local config info error", e);
                            }
                        }
                    }
    
    • 没有->有,有->有且存在变更,更新cachData配置信息
    cacheData.setUseLocalConfigInfo(true);
    cacheData.setLocalConfigInfoVersion(path.lastModified());
    cacheData.setContent(content);
    
    • 有->没有
      cacheData.setUseLocalConfigInfo(false); 从服务端获取配置。

    4.3.2.2 服务端配置变更监听

    • checkUpdateDataIds() 遍历本线程处理的cachData,对其中无本地failover配置的项,组装配置集串,格式为dataId2Group2contentMD52tenant1或者dataId2Group2contentMD5^1。设置Listening-Configs请求参数。
    • checkUpdateConfigStr()执行longpulling
      Long-Pulling-Timeout头指定请求超时时间,默认30s
      Long-Pulling-Timeout-No-Hangup首次监听时配置,告知服务端无论是否有变更都立即返回
    • parseUpdateDataIdResponse()解析服务端返回有变更的配置集。格式与请求参数Listening-Configs格式相同

    4.3.2.3 配置变更监听回调

    • 遍历便跟的配置集,从服务端获取对应配置。更新cachdata
    String content = getServerConfig(dataId, group, tenant, 3000L);
                            CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                            cache.setContent(content);
    
    • 遍历cachData列表,通过配置数据的md5检查是否发生变更,触发变更回调。
    void checkListenerMd5() {
            for (ManagerListenerWrap wrap : listeners) {
                if (!md5.equals(wrap.lastCallMd5)) {
                    safeNotifyListener(dataId, group, content, md5, wrap);
                }
            }
        }
    
    • 监听回调
      如果listener提供了线程执行器,则异步线程调用变更回调,否则在当前LongPullingRunnable线程调用。
    Runnable job = new Runnable() {
                public void run() {
                    ...
                    listener.receiveConfigInfo(contentTmp);
                    listenerWrap.lastCallMd5 = md5;
                    ...
                }
            };
    
    if (null != listener.getExecutor()) {
                    listener.getExecutor().execute(job);
                } else {
                    job.run();
                }
    

    五 ServerHttpAgent

    5.1 ServerListManager配置服务端管理

    5.1.1 配置服务器地址

    • List<String> serverUrls保存服务端地址。
    • ServerAddressIterator对服务端地址随机排序。
    • currentServerAddr存储上次访问成功的服务端地址。尽量保证longpulling配置变更监听的地址和获取配置的是同一个服务器。
    • 访问失败则调用refreshCurrentServerAddr 更新下次使用的服务端地址。
    public void refreshCurrentServerAddr() {
            currentServerAddr = iterator().next();
        }
    
        public String getCurrentServerAddr() {
            if (StringUtils.isBlank(currentServerAddr)) {
                currentServerAddr = iterator().next();
            }
            return currentServerAddr;
        }
    

    5.1.2 配置地址服务器

    使用定时线程,从地址服务器获取最新配置服务器地址列表,更新本地缓存List<String> serverUrls

    5.2 Limiter调用限速

    • 初始化最多对1000个url请求限速。
    • 每次限速区间1分钟
    private static int CAPACITY_SIZE = 1000;
        private static Cache<String, RateLimiter> cache = CacheBuilder.newBuilder()
            .initialCapacity(CAPACITY_SIZE).expireAfterAccess(1, TimeUnit.MINUTES)
            .build();
    
    • 每个限速区间限制5次调用
    rateLimiter = cache.get(accessKeyID, new Callable<RateLimiter>() {
                    @Override
                    public RateLimiter call() throws Exception {
                        return RateLimiter.create(5);
                    }
                });
    
    • 每次请求获取令牌时间最多等待1s
      rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)

    相关文章

      网友评论

        本文标题:nacos源码1-配置中心-客户端

        本文链接:https://www.haomeiwen.com/subject/chtrrqtx.html