美文网首页
nacos配置中心源码分析(2)

nacos配置中心源码分析(2)

作者: ok200 | 来源:发表于2021-04-01 14:52 被阅读0次

nacos动态配置

上一篇我们分析了nacos获取配置中心配置的源码,https://www.jianshu.com/p/b1e5155ed00a

本篇将来分析nacos如何动态更新我们在配置中心修改后的配置

  • 动态更新服务地址
  • 动态更新配置

nacos版本

<dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
     <version>2.0.0.RELEASE</version>
</dependency>

bootstrap.yaml

spring:
  application:
    name: service1
  profiles:
    active: dev
  cloud:
    nacos:
      config:
        namespace: 14d29622-bf23-4d4a-b86d-cdedc18ff83b
        file-extension: yaml
        server-addr: 127.0.0.1:8848
        ext-config[0]:
          data-id: common-${spring.profiles.active}.yaml
          refresh: true
一、动态更新服务地址(线程池轮询)

1、还是来到NacosConfigService这个类,这是nacos的一个重要的类,首先看一下构造方法

public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        // 初始化命名空间
        initNamespace(properties);
        // 初始化http请求代理
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        // 启动一个线程定时更新服务地址列表
        agent.start();
        // 构造客户端worker
        worker = new ClientWorker(agent, configFilterChainManager, properties);
}

2、agent.start()方法,负责启动定时线程获取服务地址列表

public synchronized void start() throws NacosException {

        if (isStarted || isFixed) {
            return;
        }
        // 获取服务列表任务
        GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
        for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
            getServersTask.run();
            try {
                this.wait((i + 1) * 100L);
            } catch (Exception e) {
                LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
            }
        }

        if (serverUrls.isEmpty()) {
            LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
                addressServerUrl);
            throw new NacosException(NacosException.SERVER_ERROR,
                "fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
        }
        // 每隔30s更新 底层是一个单线程的线程池
        TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
        isStarted = true;
    }

GetServerListTask内部类

class GetServerListTask implements Runnable {
        final String url;

        GetServerListTask(String url) {
            this.url = url;
        }

        @Override
        public void run() {
            /**
             * get serverlist from nameserver
             */
            try {
            // 更新服务地址
                updateIfChanged(getApacheServerList(url, name));
            } catch (Exception e) {
                LOGGER.error("[" + name + "][update-serverlist] failed to update serverlist from address server!",
                    e);
            }
        }
    }

通过http访问服务端获取服务列表地址,然后再跟当前的服务地址比较,不一致时更新

private List<String> getApacheServerList(String url, String name) {
        try {
            HttpResult httpResult = HttpSimpleClient.httpGet(url, null, null, null, 3000);

            if (HttpURLConnection.HTTP_OK == httpResult.code) {
                if (DEFAULT_NAME.equals(name)) {
                    EnvUtil.setSelfEnv(httpResult.headers);
                }
                List<String> lines = IOUtils.readLines(new StringReader(httpResult.content));
                List<String> result = new ArrayList<String>(lines.size());
                for (String serverAddr : lines) {
                    if (org.apache.commons.lang3.StringUtils.isNotBlank(serverAddr)) {
                        String[] ipPort = serverAddr.trim().split(":");
                        String ip = ipPort[0].trim();
                        if (ipPort.length == 1) {
                            result.add(ip + ":" + ParamUtil.getDefaultServerPort());
                        } else {
                            result.add(serverAddr);
                        }
                    }
                }
                return result;
            } else {
                LOGGER.error("[check-serverlist] error. addressServerUrl: {}, code: {}", addressServerUrl,
                    httpResult.code);
                return null;
            }
        } catch (IOException e) {
            LOGGER.error("[check-serverlist] exception. url: " + url, e);
            return null;
        }
    }
二、动态更新配置

动态更新配置的重点在两个类:NacosContextRefresher、ClientWorker

// 实现监听事件,Application.run()准备ok时调用onApplicationEvent注册nacos的监听器
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware

ClientWorker则负责具体的动态配置处理逻辑,主要流程如下

  1. 新建了两个线程池,一个用于定时触发检查配置更新,间隔10ms。另一个用于检查配置更新的分组多线程处理。
  2. 查询服务端存在更新的配置文件
  3. 根据更新的文件,获取配置文件内容
  4. 通过监听器发布RefreshEvent交给spring处理
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);
        // 单线程的线程池
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        // 多线程线程池
        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        // 定时执行checkConfigInfo()操作,间隔10ms
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
}

启动轮询任务

// 启动轮询任务 
public void checkConfigInfo() {
        // 分任务      
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

LongPollingRunnable的run方法,

public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.get().values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // 检查服务端是否存在更新的配置文件
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                //  获取存在更新的配置文件内容
                    String content = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(content);
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                        agent.getName(), dataId, group, tenant, cache.getMd5(),
                        ContentUtils.truncateContent(content));
                } catch (NacosException ioe) {
                    String message = String.format(
                        "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                        agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    // 存在变化的文件,通过监听器发布
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();

            executorService.execute(this);

        } catch (Throwable e) {

            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

重点看checkListenerMd5,回调NacosContextRefresher#registerNacosListener方法,

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, md5, wrap);
        }
    }
}

private void safeNotifyListener(final String dataId, final String group, final String content,
                                final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;

    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                Thread.currentThread().setContextClassLoader(appClassLoader);

                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                // 此处回调发布新的配置
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                    listener);
            } catch (NacosException de) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
                    dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
                    md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };

    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
            md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
        name, (finishNotify - startNotify), dataId, group, md5, listener);
}

发布更新事件,完成配置的更新

private void registerNacosListener(final String group, final String dataId) {
    Listener listener = (Listener)this.listenerMap.computeIfAbsent(dataId, (i) -> {
        return new Listener() {
            public void receiveConfigInfo(String configInfo) {
                NacosContextRefresher.refreshCountIncrement();
                String md5 = "";
                if (!StringUtils.isEmpty(configInfo)) {
                    try {
                        MessageDigest md = MessageDigest.getInstance("MD5");
                        md5 = (new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))).toString(16);
                    } catch (UnsupportedEncodingException | NoSuchAlgorithmException var4) {
                        NacosContextRefresher.log.warn("[Nacos] unable to get md5 for dataId: " + dataId, var4);
                    }
                }

                NacosContextRefresher.this.refreshHistory.add(dataId, md5);
                // 发布更新事件
                NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
                if (NacosContextRefresher.log.isDebugEnabled()) {
                    NacosContextRefresher.log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
                }

            }

            public Executor getExecutor() {
                return null;
            }
        };
    });

    try {
        this.configService.addListener(dataId, group, listener);
    } catch (NacosException var5) {
        var5.printStackTrace();
    }

}

相关文章

网友评论

      本文标题:nacos配置中心源码分析(2)

      本文链接:https://www.haomeiwen.com/subject/mgfqkltx.html