美文网首页
nacos2.0.4配置监听分析

nacos2.0.4配置监听分析

作者: SparkOnly | 来源:发表于2022-03-15 18:09 被阅读0次

    本篇文章从界面发布配置开始,分析整个配置发布到应用客户端变更的通用过程

    主体流程

    客户端

    ClientWorker,每个5秒执行一次配置监听(发送ConfigBatchListenRequest)。如果时间间隔超过5分钟,则同步所有配置

    服务端

    ConfigController修改配置

    1. ConfigChangePublisher发布ConfigDataChangeEvent

    2. AsyncNotifyService监听ConfigDataChangeEvent
      2.1 执行AsyncRpcTask(含有各个成员的NotifySingleRpcTask的queue)
      2.2 遍历queue
      2.2.1 如果是自己,dump
      2.2.2 如果是成员,异步同步配置改变ConfigChangeClusterSyncRequest(dataId, group, isBata, lastModified, tag, tenant)
      接收节点,dump,其余流程同3

    3. 异步执行DumpProcessor
      3.1 从config_info里查询出配置内容
      3.2 执行ConfigCacheService.dump
      3.2.1 保存到data目录
      3.2.2 更新CacheItem的md5和修改时间,发布LocalDataChangeEvent事件

    4. RpcConfigChangeNotifier监听到LocalDataChangeEvent,遍历监听的所有客户端连接,构建 ConfigChangeNotifyRequest,异步执行RpcPushTask,推送变更

    5. 客户端ClientWorker接受到ConfigChangeNotifyRequest事件后,将对应的缓存置为不同步,调用配置监听动作

    服务端处理

    1. 页面请求,可以看到,调用了/v1/cs/config接口,对应的就是com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法


      页面发布配置
    2. ConfigController#publishConfig处理
      在没有灰度IP,没有标签的情况下,会保存配置信息到config_info表,插入历史记录表his_config_info
      同时发布配置变更事件:ConfigDataChangeEvent
      具体代码如下:
    @PostMapping
        @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
        public Boolean publishConfig(...) throws NacosException {
            ...
            final Timestamp time = TimeUtils.getCurrentTime();
            # 灰度IP
            String betaIps = request.getHeader("betaIps");
            ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
            configInfo.setType(type);
            if (StringUtils.isBlank(betaIps)) {
                if (StringUtils.isBlank(tag)) {
                    # 没有标签,则保存信息到表config_info,插入历史记录表his_config_info
                    persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                    # 发布ConfigDataChangeEvent事件
                    ConfigChangePublisher
                            .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
                } else {
                    persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                    ConfigChangePublisher.notifyConfigChange(
                            new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
                }
            } else {
                // beta publish
                persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
            }
            ConfigTraceService
                    .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                            ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
            return true;
        }
    
    1. 接下来到监听配置变更事件的AsyncNotifyService
      首先是事先注册好监听的事件
      事件处理器里,可以看到遍历成员,会构建两个队列,一个是http队列,一个是rpc队列。
      对于支持长连接的成员,会构建NotifySingleRpcTask任务,放入rpc队列。
      对于rpc客户端,异步执行AsyncRpcTask任务
    @Autowired
        public AsyncNotifyService(ServerMemberManager memberManager) {
            this.memberManager = memberManager;
            
            // Register ConfigDataChangeEvent to NotifyCenter.
            NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
            
            // Register A Subscriber to subscribe ConfigDataChangeEvent.
            NotifyCenter.registerSubscriber(new Subscriber() {
                
                @Override
                public void onEvent(Event event) {
                    // Generate ConfigDataChangeEvent concurrently
                    if (event instanceof ConfigDataChangeEvent) {
                        ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                        long dumpTs = evt.lastModifiedTs;
                        String dataId = evt.dataId;
                        String group = evt.group;
                        String tenant = evt.tenant;
                        String tag = evt.tag;
                        Collection<Member> ipList = memberManager.allMembers();
                        
                        // In fact, any type of queue here can be
                        Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                        Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                        # 遍历所有成员
                        for (Member member : ipList) {
                            if (!MemberUtil.isSupportedLongCon(member)) {
                                httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                        evt.isBeta));
                            } else {
                                rpcQueue.add(
                                        new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                            }
                        }
                        if (!httpQueue.isEmpty()) {
                            ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                        }
                        if (!rpcQueue.isEmpty()) {
                            # 异步执行rpc任务
                            ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                        }
                        
                    }
                }
                
                @Override
                public Class<? extends Event> subscribeType() {
                    return ConfigDataChangeEvent.class;
                }
            });
        }
    
    1. AsyncNotifyService.AsyncRpcTask#run方法
      这里遍历第3步构建的队列,取出每个rpc任务,构建ConfigChangeClusterSyncRequest 请求
      4.1 如果成员是自己,dump请求
      4.2 如果成员为成员列表里的其他成员,发送ConfigChangeClusterSyncRequest到对应节点
    public void run() {
                while (!queue.isEmpty()) {
                    NotifySingleRpcTask task = queue.poll();
                    
                    ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                    syncRequest.setDataId(task.getDataId());
                    syncRequest.setGroup(task.getGroup());
                    syncRequest.setBeta(task.isBeta);
                    syncRequest.setLastModified(task.getLastModified());
                    syncRequest.setTag(task.tag);
                    syncRequest.setTenant(task.getTenant());
                    Member member = task.member;
                    # 任务的成员为自己
                    if (memberManager.getSelf().equals(member)) {
                        if (syncRequest.isBeta()) {
                            dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                    syncRequest.getLastModified(), NetUtils.localIP(), true);
                        } else {
                            # 正常发布
                            dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                    syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                        }
                        continue;
                    }
                    # 成员列表里的其他成员
                    if (memberManager.hasMember(member.getAddress())) {
                        // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
                        boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
                        if (unHealthNeedDelay) {
                            // target ip is unhealthy, then put it in the notification list
                            ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                    task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                    0, member.getAddress());
                            // get delay time and set fail count to the task
                            asyncTaskExecute(task);
                        } else {
        
                            if (!MemberUtil.isSupportedLongCon(member)) {
                                asyncTaskExecute(
                                        new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                                task.getLastModified(), member.getAddress(), task.isBeta));
                            } else {
                                try {
                                    configClusterRpcClientProxy
                                            .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                                } catch (Exception e) {
                                    MetricsMonitor.getConfigNotifyException().increment();
                                    asyncTaskExecute(task);
                                }
                            }
                          
                        }
                    } else {
                        //No nothig if  member has offline.
                    }
                    
                }
            }
    

    4.3 其他成员节点,ConfigChangeClusterSyncRequestHandler,接收到ConfigChangeClusterSyncRequest ,同样执行dump操作


    其他节点,同样dump

    5 dump操作
    com.alibaba.nacos.config.server.service.dump.DumpService#dump
    5.1 添加一个DumpTask任务

    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
                boolean isBeta) {
            String groupKey = GroupKey2.getKey(dataId, group, tenant);
            String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
            dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
            DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
        }
    

    5.2 在DumpService实例化时,设置了dupTaskMgr的默认任务处理器DumpProcessor


    dumpTaskMgr默认任务处理器

    6 Dump任务处理
    com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
    这里主要是根据任务里的参数查找配置信息,执行DumpConfigHandler#configDump

    public boolean process(NacosTask task) {
            final PersistService persistService = dumpService.getPersistService();
            DumpTask dumpTask = (DumpTask) task;
            String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
            String dataId = pair[0];
            String group = pair[1];
            String tenant = pair[2];
            long lastModified = dumpTask.getLastModified();
            String handleIp = dumpTask.getHandleIp();
            boolean isBeta = dumpTask.isBeta();
            String tag = dumpTask.getTag();
            
            ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
                    .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
            
            if (isBeta) {
                // if publish beta, then dump config, update beta cache
                ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
                
                build.remove(Objects.isNull(cf));
                build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
                build.content(Objects.isNull(cf) ? null : cf.getContent());
                
                return DumpConfigHandler.configDump(build.build());
            }
            if (StringUtils.isBlank(tag)) {
                # 查找配置信息
                ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
    
                build.remove(Objects.isNull(cf));
                build.content(Objects.isNull(cf) ? null : cf.getContent());
                build.type(Objects.isNull(cf) ? null : cf.getType());
            } else {
                ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
    
                build.remove(Objects.isNull(cf));
                build.content(Objects.isNull(cf) ? null : cf.getContent());
    
            }
            return DumpConfigHandler.configDump(build.build());
        }
    

    7 配置dump
    com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
    更新事件中,调用ConfigCacheService.dump执行dump操作

    public static boolean configDump(ConfigDumpEvent event) {
            final String dataId = event.getDataId();
            final String group = event.getGroup();
            final String namespaceId = event.getNamespaceId();
            final String content = event.getContent();
            final String type = event.getType();
            final long lastModified = event.getLastModifiedTs();
            if (event.isBeta()) {
                ...
            }
            if (StringUtils.isBlank(event.getTag())) {
                ...
                boolean result;
                if (!event.isRemove()) {
                   # 非删除事件,执行dump操作
                    result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
                    
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                                ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                                content.length());
                    }
                } else {
                    result = ConfigCacheService.remove(dataId, group, namespaceId);
                    
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                                ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                    }
                }
                return result;
            } else {
                ...
            }
            
        }
    

    8 ConfigCacheService#dump操作
    保存配置文件,更新cache里的md5值,同时发布LocalDataChangeEvent事件

    public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
                String type) {
            String groupKey = GroupKey2.getKey(dataId, group, tenant);
            CacheItem ci = makeSure(groupKey);
            ci.setType(type);
            final int lockResult = tryWriteLock(groupKey);
            assert (lockResult != 0);
            
            if (lockResult < 0) {
                DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
                return false;
            }
            
            try {
                final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
                
                if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
                    DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                    + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                            lastModifiedTs);
                } else if (!PropertyUtil.isDirectRead()) {
                   # 保存配置到目录/data/tenant-config-data
                    DiskUtil.saveToDisk(dataId, group, tenant, content);
                }
                # 更新缓存里的md5,发布LocalDataChangeEvent事件
                updateMd5(groupKey, md5, lastModifiedTs);
                return true;
            } catch (IOException ioe) {
                DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
                if (ioe.getMessage() != null) {
                    String errMsg = ioe.getMessage();
                    if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                            .contains(DISK_QUATA_EN)) {
                        // Protect from disk full.
                        FATAL_LOG.error("磁盘满自杀退出", ioe);
                        System.exit(0);
                    }
                }
                return false;
            } finally {
                releaseWriteLock(groupKey);
            }
        }
    

    更新md5的操作。更新cache里的md5和最后修改时间,发布LocalDataChangeEvent事件

    public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
            CacheItem cache = makeSure(groupKey);
            if (cache.md5 == null || !cache.md5.equals(md5)) {
                cache.md5 = md5;
                cache.lastModifiedTs = lastModifiedTs;
                NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
            }
        }
    

    9 RpcConfigChangeNotifier处理LocalDataChangeEvent事件
    遍历groupKey对应的所有客户端连接,构造ConfigChangeNotifyRequest 请求,推送给客户端

    public void onEvent(LocalDataChangeEvent event) {
            String groupKey = event.groupKey;
            boolean isBeta = event.isBeta;
            List<String> betaIps = event.betaIps;
            String[] strings = GroupKey.parseKey(groupKey);
            String dataId = strings[0];
            String group = strings[1];
            String tenant = strings.length > 2 ? strings[2] : "";
            String tag = event.tag;
            
            configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
            
        }
    
    public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
                List<String> betaIps, String tag) {
            
            Set<String> listeners = configChangeListenContext.getListeners(groupKey);
            if (CollectionUtils.isEmpty(listeners)) {
                return;
            }
            int notifyClientCount = 0;
            for (final String client : listeners) {
                Connection connection = connectionManager.getConnection(client);
                if (connection == null) {
                    continue;
                }
    
                //beta ips check.
                String clientIp = connection.getMetaInfo().getClientIp();
                String clientTag = connection.getMetaInfo().getTag();
                if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
                    continue;
                }
                //tag check
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
                    continue;
                }
    
                ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
    
                RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
                        connection.getMetaInfo().getAppName());
                push(rpcPushRetryTask);
                notifyClientCount++;
            }
            Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
        }
    

    客户端处理

    1. 初始化ClientWorker,构建ConfigRpcTransportClient,每隔5秒执行配置监听


      初始化ConfigRpcTransportClient
      定期执行配置监听
    2. 配置监听逻辑,如果距离上次全量同步时间达到5分钟,则全量同步
      2.1 遍历所有缓存的数据,跳过已同步的缓存,根据缓存是否存在监听器,构造listenCachesMap和removeListenCachesMap
      2.2 遍历listenCachesMap,构造ConfigBatchListenRequest 请求,发送到服务端。根据响应构造changeKey,从配置中心拉取配置,检查监听器的md5和数据的md5是否一致,不一致就调用监听器的监听方法
      2.3 遍历removeListenCachesMap,构造ConfigBatchListenRequest 请求,移除服务端的监听。如果成功则移除本地缓存
    public void executeConfigListen() {
                Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
                Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
                long now = System.currentTimeMillis();
                boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
                for (CacheData cache : cacheMap.get().values()) {
                    synchronized (cache) {
                        //check local listeners consistent.
                        # 如果本缓存已经和服务端同步 && 不需要全量同步,就跳过处理
                        if (cache.isSyncWithServer()) {
                            cache.checkListenerMd5();
                            if (!needAllSync) {
                                continue;
                            }
                        }
                        
                        if (!CollectionUtils.isEmpty(cache.getListeners())) {
                            //get listen  config
                            if (!cache.isUseLocalConfigInfo()) {
                                List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
                                if (cacheDatas == null) {
                                    cacheDatas = new LinkedList<>();
                                    listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                                }
                                cacheDatas.add(cache);
                                
                            }
                        } else if (CollectionUtils.isEmpty(cache.getListeners())) {
                            if (!cache.isUseLocalConfigInfo()) {
                                List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
                                if (cacheDatas == null) {
                                    cacheDatas = new LinkedList<>();
                                    removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                                }
                                cacheDatas.add(cache);
                                
                            }
                        }
                    }
                    
                }
                
                boolean hasChangedKeys = false;
                # 有监听器的缓存处理
                if (!listenCachesMap.isEmpty()) {
                    for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
                        String taskId = entry.getKey();
                        Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
                        
                        List<CacheData> listenCaches = entry.getValue();
                        for (CacheData cacheData : listenCaches) {
                            timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
                                    cacheData.getLastModifiedTs().longValue());
                        }
                        
                        ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
                        configChangeListenRequest.setListen(true);
                        try {
                            RpcClient rpcClient = ensureRpcClient(taskId);
                            ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                                    rpcClient, configChangeListenRequest);
                            if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
                                
                                Set<String> changeKeys = new HashSet<String>();
                                //handle changed keys,notify listener
                                if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
                                    hasChangedKeys = true;
                                    for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                                            .getChangedConfigs()) {
                                        String changeKey = GroupKey
                                                .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
                                                        changeConfig.getTenant());
                                        changeKeys.add(changeKey);
                                        boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                        refreshContentAndCheck(changeKey, !isInitializing);
                                    }
                                    
                                }
                                
                                //handler content configs
                                for (CacheData cacheData : listenCaches) {
                                    String groupKey = GroupKey
                                            .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
                                    if (!changeKeys.contains(groupKey)) {
                                        //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
                                        synchronized (cacheData) {
                                            if (!cacheData.getListeners().isEmpty()) {
                                                
                                                Long previousTimesStamp = timestampMap.get(groupKey);
                                                if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
                                                        System.currentTimeMillis())) {
                                                    continue;
                                                }
                                                cacheData.setSyncWithServer(true);
                                            }
                                        }
                                    }
                                    
                                    cacheData.setInitializing(false);
                                }
                                
                            }
                        } catch (Exception e) {
                            
                            LOGGER.error("Async listen config change error ", e);
                            try {
                                Thread.sleep(50L);
                            } catch (InterruptedException interruptedException) {
                                //ignore
                            }
                        }
                    }
                }
                # 无监听器的缓存处理
                if (!removeListenCachesMap.isEmpty()) {
                    for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
                        String taskId = entry.getKey();
                        List<CacheData> removeListenCaches = entry.getValue();
                        ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
                        configChangeListenRequest.setListen(false);
                        try {
                            RpcClient rpcClient = ensureRpcClient(taskId);
                            boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
                            if (removeSuccess) {
                                for (CacheData cacheData : removeListenCaches) {
                                    synchronized (cacheData) {
                                        if (cacheData.getListeners().isEmpty()) {
                                            ClientWorker.this
                                                    .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                                        }
                                    }
                                }
                            }
                            
                        } catch (Exception e) {
                            LOGGER.error("async remove listen config change error ", e);
                        }
                        try {
                            Thread.sleep(50L);
                        } catch (InterruptedException interruptedException) {
                            //ignore
                        }
                    }
                }
                
                if (needAllSync) {
                    lastAllSyncTime = now;
                }
                //If has changed keys,notify re sync md5.
                if (hasChangedKeys) {
                    notifyListenConfig();
                }
            }
    
    private void refreshContentAndCheck(String groupKey, boolean notify) {
            if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
                CacheData cache = cacheMap.get().get(groupKey);
                # 获取服务端配置,检查md5和缓存的是否一致,不一致则执行监听器方法
                refreshContentAndCheck(cache, notify);
            }
        }
        
        private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
            try {
                # 获取服务端配置
                ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                        notify);
                cacheData.setContent(response.getContent());
                cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
                if (null != response.getConfigType()) {
                    cacheData.setType(response.getConfigType());
                }
                if (notify) {
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                            agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                            ContentUtils.truncateContent(response.getContent()), response.getConfigType());
                }
                # 缓存数据的md5和监听器的md5不一致,
                cacheData.checkListenerMd5();
            } catch (Exception e) {
                LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                        cacheData.group, cacheData.tenant, e);
            }
        }
    
    1. CacheData
      检查监听器缓存的md5是否和数据的md5一致,不一致则触发监听器的receiveConfigChange,推送变更
    void checkListenerMd5() {
            for (ManagerListenerWrap wrap : listeners) {
                if (!md5.equals(wrap.lastCallMd5)) {
                    safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
                }
            }
        }
    
    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
                final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
            final Listener listener = listenerWrap.listener;
            if (listenerWrap.inNotifying) {
                LOGGER.warn(
                        "[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
                        name, dataId, group, md5, listener);
                return;
            }
            Runnable job = () -> {
                long start = System.currentTimeMillis();
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // Before executing the callback, set the thread classloader to the classloader of
                    // the specific webapp to avoid exceptions or misuses when calling the spi interface in
                    // the callback method (this problem occurs only in multi-application deployment).
                    Thread.currentThread().setContextClassLoader(appClassLoader);
                    
                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    cr.setEncryptedDataKey(encryptedDataKey);
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr.getContent();
                    listenerWrap.inNotifying = true;
                    // 监听器执行监听动作
                    listener.receiveConfigInfo(contentTmp);
                    // compare lastContent and content
                    if (listener instanceof AbstractConfigChangeListener) {
                        Map data = ConfigChangeHandler.getInstance()
                                .parseChangeData(listenerWrap.lastContent, content, type);
                        ConfigChangeEvent event = new ConfigChangeEvent(data);
                        ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                        listenerWrap.lastContent = content;
                    }
                    
                    listenerWrap.lastCallMd5 = md5;
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
                            dataId, group, md5, listener, (System.currentTimeMillis() - start));
                } catch (NacosException ex) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                            name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                } catch (Throwable t) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                            group, md5, listener, t.getCause());
                } finally {
                    listenerWrap.inNotifying = false;
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            };
            
            final long startNotify = System.currentTimeMillis();
            try {
                if (null != listener.getExecutor()) {
                    listener.getExecutor().execute(job);
                } else {
                    try {
                        INTERNAL_NOTIFIER.submit(job);
                    } catch (RejectedExecutionException rejectedExecutionException) {
                        LOGGER.warn(
                                "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
                                name, dataId, group, md5, listener);
                        job.run();
                    } catch (Throwable throwable) {
                        LOGGER.error(
                                "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
                                name, dataId, group, md5, listener, throwable);
                        job.run();
                    }
                }
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                        group, md5, listener, t.getCause());
            }
            final long finishNotify = System.currentTimeMillis();
            LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                    name, (finishNotify - startNotify), dataId, group, md5, listener);
        }
    
    1. RpcClient监听服务端主动推送的配置变更ConfigChangeNotifyRequest
      这里会修改缓存的最后修改时间,将缓存状态置为不同步,触发配置监听动作


      RpcClient监听配置变更

    相关文章

      网友评论

          本文标题:nacos2.0.4配置监听分析

          本文链接:https://www.haomeiwen.com/subject/anjydrtx.html