美文网首页分布式技术
SpringCloud Alibaba Nacos 配置中心源码

SpringCloud Alibaba Nacos 配置中心源码

作者: DoubleFooker | 来源:发表于2019-05-08 12:23 被阅读0次

    Nacos配置中心

    基于版本1.0.0

    Nacos Client

    使用Client的Demo代码

    public class ClientDemo {
        public static void main(String[] args) {
            try {
                String serverAddr = "127.0.0.1:8848";
                String dataId = "nacos-demo.properties";
                String group = "DEFAULT_GROUP";
                Properties properties = new Properties();
                properties.put("serverAddr", serverAddr);
                ConfigService configService = NacosFactory.createConfigService(properties);
                //没有添加listener 直接获取服务器配置
                String content = configService.getConfig(dataId, group, 5000);
                System.out.println(content);
            } catch (NacosException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    

    整个过程创建了ConfigService,查看NacosFactory.createConfigService(properties);

    public class NacosFactory {
        public static ConfigService createConfigService(Properties properties) 
            throws NacosException {
            //查看方法
            return ConfigFactory.createConfigService(properties);
        }
    //.....
    }
    

    可以得知通过ConfigFactory工厂类创建ConfigService,继续查看,通过反射创建实例

    public class ConfigFactory {
        public static ConfigService createConfigService(Properties properties) 
            throws NacosException {
            try {
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
                Constructor constructor = driverImplClass.getConstructor(Properties.class);
                //反射创建实例方法
                ConfigService vendorImpl = 
                    (ConfigService) constructor.newInstance(properties);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(-400, e.getMessage());
            }
        }
    }
    

    通过反射形式创建了NacosConfigService实例。ConfigService是个接口,其构造如下

    public interface ConfigService {
        //获取配置信息
        String getConfig(String dataId, String group, long timeoutMs) 
            throws NacosException;
        //添加配置更新的监听器
        void addListener(String dataId, String group, Listener listener) 
            throws NacosException;
       //发布配置信息
        boolean publishConfig(String dataId, String group, String content) 
            throws NacosException;
       //删除配置
        boolean removeConfig(String dataId, String group) 
            throws NacosException;
        //移除监听器
        void removeListener(String dataId, String group, Listener listener);
        String getServerStatus();
    }
    

    NacosConfigService实现如下:

    public class NacosConfigService implements ConfigService {
        private final long POST_TIMEOUT = 3000L;
        private static final String EMPTY = "";
        private HttpAgent agent;
        private ClientWorker worker;// 负责执行长轮询任务
        private String namespace;
        private String encode;
        private ConfigFilterChainManager configFilterChainManager = 
            new ConfigFilterChainManager();
        public NacosConfigService(Properties properties) throws NacosException {
            String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
            if (StringUtils.isBlank(encodeTmp)) {
                encode = Constants.ENCODE;
            } else {
                encode = encodeTmp.trim();
            }
            initNamespace(properties);
            //服务信息,用于鉴权、服务地址等的配置信息设置
            agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
            //启动了ServerHttpAgent,作用是监听服务器列表是否变化。同步方法。具体查看阅读源码了解
            agent.start();
            //长轮询任务实例,监听配置更新的关键
            worker = new ClientWorker(agent, configFilterChainManager);
        }
        //...
    }
    
    

    ClientWorker源码

        public ClientWorker(final HttpAgent agent, 
                            final ConfigFilterChainManager configFilterChainManager) {
            this.agent = agent;
            this.configFilterChainManager = configFilterChainManager;
            executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
            executorService = Executors.newCachedThreadPool(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
            executor.scheduleWithFixedDelay(new Runnable() {
                public void run() {
                    try {
                        //检查配置是否变化,通过cacheData(linstener)记录配置文件md5信息比较
                        checkConfigInfo();
                    } catch (Throwable e) {
                    }
                }
            }, 1L, 10L, TimeUnit.MILLISECONDS);//每10毫秒执行一次
        }
        public void checkConfigInfo() {
            // cacheMap保存的 groupKey -> cacheData。groupkey以dataId+group定义
            int listenerSize = cacheMap.get().size();
            int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
            if (longingTaskCount > currentLongingTaskCount) {
                for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                    // 执行配置信息更新线程
                    executorService.execute(new LongPollingRunnable(i));
                }
                currentLongingTaskCount = longingTaskCount;
            }
        }
    

    LongPollingRunnable结构

        class LongPollingRunnable implements Runnable {
            private int taskId;
            public LongPollingRunnable(int taskId) {
                this.taskId = taskId;
            }
            public void run() {
                try {
                    List<CacheData> cacheDatas = new ArrayList<CacheData>();
                    for (CacheData cacheData : cacheMap.get().values()) {
                        if (cacheData.getTaskId() == taskId) {
                            cacheDatas.add(cacheData);
                            try {
                                // 本地缓存检查
                                checkLocalConfig(cacheData);
                                if (cacheData.isUseLocalConfigInfo()) {
                                    // 更新linstener记录的md5值
                                    cacheData.checkListenerMd5();
                                }
                            } catch (Exception e) {
                            }
                        }
                    }
                    List<String> inInitializingCacheList = new ArrayList<String>();
                    //获取配置中心变化的配置信息,对inInitializingCacheList初始化
                    List<String> changedGroupKeys = 
                        checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                    // 配置发生变化则获取服务器最新配置
                    for (String groupKey : changedGroupKeys) {
                        String[] key = GroupKey.parseKey(groupKey);
                        String dataId = key[0];
                        String group = key[1];
                        String tenant = null;
                        if (key.length == 3) {
                            tenant = key[2];
                        }
                        try {
                            //获取服务器配置信息
                  String content = getServerConfig(dataId, group, tenant, 3000L);
                  CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                            cache.setContent(content);//设置listener变化配置内容
                        } catch (NacosException ioe) {
                        }
                    }
                    for (CacheData cacheData : cacheDatas) {
                        if (!cacheData.isInitializing() || inInitializingCacheList
                            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                            //判断监听器md5是否相同,不同更新最新md5  
                            //有变化这发起listener的receiveConfigInfo通知
                            cacheData.checkListenerMd5();
                            cacheData.setInitializing(false);
                        }
                    }
                    inInitializingCacheList.clear();
                } catch (Throwable e) {
                } finally {
                    //再次执行任务,实现循环
                    executorService.execute(this);
                }
            }
        }
    
    

    checkUpdateDataIds代码中调用 checkUpdateConfigStr

    
        /**
         * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
         */
    //probeUpdateString以dataId group md5 tenant命名
    List<String> checkUpdateConfigStr(String probeUpdateString, 
                                      boolean isInitializingCacheList) {
        List<String> params = 
            Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
        long timeout = TimeUnit.SECONDS.toMillis(30L);// 30秒超时
        List<String> headers = new ArrayList<String>(2);
        headers.add("Long-Pulling-Timeout");
        headers.add("" + timeout);
        // told server do not hang me up if new initializing cacheData added in
        // 初始化时直接返回不等待,具体看服务端实现
        if (isInitializingCacheList) {
            headers.add("Long-Pulling-Timeout-No-Hangup");
            headers.add("true");
        }
        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }
        try {
          //调用nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener 获取服务器配置更新信息
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);
            if (HttpURLConnection.HTTP_OK == result.code) {
                setHealthServer(true);
                // 返回有跟新的groupkey
                return parseUpdateDataIdResponse(result.content);
            } else {
                setHealthServer(false);
            }
        } catch (IOException e) {
        }
        return Collections.emptyList();
    }
    

    checkLocalConfig方法

        private void checkLocalConfig(CacheData cacheData) {
            final String dataId = cacheData.dataId;
            final String group = cacheData.group;
            final String tenant = cacheData.tenant;
    //获取本地缓存的文件   
    //LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data"下是否有缓存;
            File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
            if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
                String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
                String md5 = MD5.getInstance().getMD5String(content);
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(path.lastModified());
                cacheData.setContent(content);
                return;
            }
            // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
            if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
                cacheData.setUseLocalConfigInfo(false);
                return;
            }
            // 有变更
            if (cacheData.isUseLocalConfigInfo() && path.exists()
                && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
                String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
                String md5 = MD5.getInstance().getMD5String(content);
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(path.lastModified());
                cacheData.setContent(content);
                return;
            }
        }
    

    服务端代码导读

    API接口/nacos/v1/cs/configs/listener,接口代码

        @RequestMapping(value = "/listener", method = RequestMethod.POST)
        public void listener(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
            request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
            // client传过来的groupkey
            String probeModify = request.getParameter("Listening-Configs");
            if (StringUtils.isBlank(probeModify)) {
                throw new IllegalArgumentException("invalid probeModify");
            }
            probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
            Map<String, String> clientMd5Map;
            try {
                clientMd5Map = MD5Util.getClientMd5Map(probeModify);
            } catch (Throwable e) {
                throw new IllegalArgumentException("invalid probeModify");
            }
            // do long-polling
            //inner为ConfigServletInner
            inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
        }
    

    ConfigServletInner实现

    public class ConfigServletInner {
        @Autowired
        private LongPollingService longPollingService;
        @Autowired
        private PersistService persistService;
        private static final int TRY_GET_LOCK_TIMES = 9;
        private static final int START_LONGPOLLING_VERSION_NUM = 204;
        /**
         * 轮询接口
         */
        public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize)
            throws IOException, ServletException {
            // 长轮询
            if (LongPollingService.isSupportLongPolling(request)) {
                longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
                return HttpServletResponse.SC_OK + "";
            }
           // ...省略短轮询代码
            return HttpServletResponse.SC_OK + "";
        }
    

    longPollingService.addLongPollingClient

        public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {
            String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
            String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
            String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
            String tag = req.getHeader("Vipserver-Tag");
            int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
            /**
             * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
             */
            long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
            if (isFixedPolling()) {
                timeout = Math.max(10000, getFixedPollingInterval());
            } else {
                long start = System.currentTimeMillis();
                List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
                if (changedGroups.size() > 0) {
                    generateResponse(req, rsp, changedGroups);
                    return;
                } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                    return;
                }
            }
            String ip = RequestUtil.getRemoteIp(req);
            // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
            final AsyncContext asyncContext = req.startAsync();
            // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
            asyncContext.setTimeout(0L);
           // 延迟30s-delayTime执行,再看ClientLongPolling代码
            scheduler.execute(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
        }
    

    ClientLongPolling代码

    ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
        this.asyncContext = ac;
        this.clientMd5Map = clientMd5Map;
        this.probeRequestSize = probeRequestSize;
        this.createTime = System.currentTimeMillis();
        this.ip = ip;
        this.timeoutTime = timeoutTime;
        this.appName = appName;
        this.tag = tag;
    }        
    /**
    *其中final Queue<ClientLongPolling> allSubs; 是一个队列
    * 这个队列维持客户端的请求任务,当调用配置修改api时会触发入队
    * 
    */
    public void run() {
        asyncTimeoutFuture = scheduler.schedule(new Runnable() {
            public void run() {
                try {
    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    //删除订阅关系
                    allSubs.remove(ClientLongPolling.this);
                    if (isFixedPolling()) {
           // 通过比较md5值获取配置变更groupkey
           List<String> changedGroups = MD5Util.compareMd5(
           (HttpServletRequest)asyncContext.getRequest(),
           (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null);
                        }
                    } else {
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                }
            }
        }, timeoutTime, TimeUnit.MILLISECONDS);
        allSubs.add(this);
    }
    

    配置修改API

        @RequestMapping(method = RequestMethod.POST)
        @ResponseBody
        public Boolean publishConfig(...)
            throws NacosException {
           //...
            final Timestamp time = TimeUtils.getCurrentTime();
            String betaIps = request.getHeader("betaIps");
            ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
            if (StringUtils.isBlank(betaIps)) {
                if (StringUtils.isBlank(tag)) {
                    persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                    // 关键查看这个方法
                    EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
                } else {
    persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                    EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
                }
            } else { // beta publish
    persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
    EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
            }
            ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
            return true;
        }
    
    

    EventDispatcher.fireEvent

        static public void fireEvent(Event event) {
            if (null == event) {
                throw new IllegalArgumentException();
            }
    //从linstener中获取变更事件触发实例
            for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
                try {
                    listener.onEvent(event);
                } catch (Exception e) {
                    log.error(e.toString(), e);
                }
            }
        }
    

    而listeners维护在Entry中,Entry重LISTENER_HUB中获取,而LISTENER_HUB则通过addEventListener添加linstener

    // AbstractEventListener的构造方法会触发   EventDispatcher.addEventListener。 因此查看
    //AbstractEventListener的实现
    static public abstract class AbstractEventListener {
        public AbstractEventListener() {
            EventDispatcher.addEventListener(this);
        }
    }
    

    查看LongPollingService,当客户端添加了linstener时allSub维护的订阅关系,而配置变更时会触发onEvent事件,onEvent执行DataChangeTask任务。

        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // ignore
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                    scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
    

    DataChangeTask 实现

            public void run() {
                try {
                    ConfigService.getContentBetaMd5(groupKey);
                    for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                        ClientLongPolling clientSub = iter.next();
                        //找到客户端订阅的groupkey任务
                        if (clientSub.clientMd5Map.containsKey(groupKey)) {
                            // ...
                            getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                            iter.remove(); // 删除订阅关系
                            //通过长轮询,返回变更消息通知客户端
                            clientSub.sendResponse(Arrays.asList(groupKey));
                        }
                    }
                } catch (Throwable t) {
                }
            }
    

    相关文章

      网友评论

        本文标题:SpringCloud Alibaba Nacos 配置中心源码

        本文链接:https://www.haomeiwen.com/subject/tcrmoqtx.html