美文网首页
nacos源码2-配置中心-服务端

nacos源码2-配置中心-服务端

作者: modou1618 | 来源:发表于2019-01-05 14:00 被阅读0次

一 配置获取

  • 获取目标配置集读锁,避免和并发写冲突
    int lockResult = tryConfigReadLock(request, response, groupKey);
    tryConfigReadLock调用 tryReadLock尝试获取读锁,最多尝试10次,每次获取失败sleep 1ms。
lockResult值 含义
0 表示不存在目标配置集
大于0 获取锁成功
小雨0 获取锁失败,已经有写锁。
 static public int tryReadLock(String groupKey) {
        CacheItem groupItem = CACHE.get(groupKey);
        int result = (null == groupItem) ? 0 : (groupItem.rwLock.tryReadLock() ? 1 : -1);
        if (result < 0) {
            defaultLog.warn("[read-lock] failed, {}, {}", result, groupKey);
        }
        return result;
    }
  • 获取配置
    根据key获取配置的相关信息CacheItem
    非集群模式从数据库获取配置,集群模式从本地缓存文件获取配置
CacheItem cacheItem = ConfigService.getContentCache(groupKey);
md5 = cacheItem.getMd5();
                            lastModified = cacheItem.getLastModifiedTs();
                            if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
                                configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
                            } else {
                                file = DiskUtil.targetFile(dataId, group, tenant);
                            }

二 配置变更

配置和监听流程图.png

2.1异步通知

  • 配置变更时,先更改数据库信息,同时更改配置表和添加一条配置历史记录。然后走异步通知所有服务端更新本地缓存文件配置。
addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                    insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
  • 一次变更触发一个AsyncTask线程任务
    遍历配置服务端列表,依次发送变更通知。
    发送失败或当前服务端连接不稳定,则延迟重发。
  • 延迟时间获取函数
private static int getDelayTime(NotifySingleTask task) {
        int failCount = task.getFailCount();
        int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS;
        if (failCount <= MAXCOUNT) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

    private static int MINRETRYINTERVAL = 500;
    private static int INCREASESTEPS = 1000;
    private static int MAXCOUNT = 6;

2.2 文件缓存

  • 创建缓存任务
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
                     boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    }
  • DumpProcessor执行缓存任务
    获取写锁
    更新缓存配置文件
    更新内存缓存cachItem的md5
ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified)

dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
final int lockResult = tryWriteLock(groupKey);
DiskUtil.saveToDisk(dataId, group, tenant, content);
updateMd5(groupKey, md5, lastModifiedTs);
}

2.3 变更监听响应

  • 创建线程执行DataChangeTask
  • final Queue<ClientLongPulling> allSubs,存储所有监听配置变更的客户端长轮询信息ClientLongPulling。
    final Map<String, String> clientMd5Map; 客户端监听的配置集集合
  • 遍历监听列表,监听当前变更的配置集,则发送响应
LongPollingService.DataChangeTask.run() {
for (Iterator<ClientLongPulling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPulling clientSub = iter.next();
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        iter.remove(); // 删除订阅关系
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
}
  • 组装变更的配置集信息,asyncContext.complete(); 发送响应

三 配置变更监听

3.1 短轮询

  • 解析监听的配置集集合
  • 检查客户端md5和当前缓存的cachItem的md5,放回变更配置集集合

3.2 长轮询

  • 获取长轮询超时时间,和立即返回参数
  • 非固定周期轮询,则先判断是否有配置变更或有LONG_PULLING_NO_HANG_UP_HEADER,有则立即返回变更的配置集集合
  • http请求异步化
    final AsyncContext asyncContext = req.startAsync()
  • 启动异步线程ClientLongPulling
    延迟线程,对超时无变更的客户端返回响应,避免请求超时。
    延迟线程中,对isFixedPolling()固定轮询的情况会先检查是否有配置变更再返回结果。
    在allSubs中保存监听变更的客户端信息。 在2.3节配置变更时进行响应。
public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_PULLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_PULLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);

        scheduler.execute(
            new ClientLongPulling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

四 监听查询

4.1 获取监听指定配置集的客户端信息

  • 入口GET /v1/cs/configs/listener 获取监听配置集变更的客户端信息
  • 遍历配置服务端列表, GET /v1/cs/communication/configWatchers 从服务端的final Queue<ClientLongPulling> allSubs中获取配置集的监听客户端信息
  • 合并所有服务端数据。

4.2 获取指定客户端监听的所有配置集信息

  • 入口GET /v1/cs/listener
  • 遍历服务端列表,GET /v1/cs/communication/watcherConfigs,从allSubs中获取客户端监听的所有配置集
  • 合并所有服务端数据。

五 容量限制

  • 使用aop方式对controller接口层的配置发布和删除进行控制。
  • tenant_capacity 存储租户的配置容量
  • group_capacity 存储分组的配置容量,若group_id=""则表示集群的配置容量
    code | 说明
    ---- | ----
    OVER_CLUSTER_QUOTA|超过集群配置个数上限
    OVER_GROUP_QUOTA|超过该Group配置个数上限
    OVER_TENANT_QUOTA|超过该租户配置个数上限
    OVER_MAX_SIZE|超过配置的内容大小上限

相关文章

网友评论

      本文标题:nacos源码2-配置中心-服务端

      本文链接:https://www.haomeiwen.com/subject/bhtrrqtx.html