美文网首页分布式技术
SpringCloud Alibaba Nacos 配置中心源码

SpringCloud Alibaba Nacos 配置中心源码

作者: DoubleFooker | 来源:发表于2019-05-08 12:23 被阅读0次

Nacos配置中心

基于版本1.0.0

Nacos Client

使用Client的Demo代码

public class ClientDemo {
    public static void main(String[] args) {
        try {
            String serverAddr = "127.0.0.1:8848";
            String dataId = "nacos-demo.properties";
            String group = "DEFAULT_GROUP";
            Properties properties = new Properties();
            properties.put("serverAddr", serverAddr);
            ConfigService configService = NacosFactory.createConfigService(properties);
            //没有添加listener 直接获取服务器配置
            String content = configService.getConfig(dataId, group, 5000);
            System.out.println(content);
        } catch (NacosException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

整个过程创建了ConfigService,查看NacosFactory.createConfigService(properties);

public class NacosFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        //查看方法
        return ConfigFactory.createConfigService(properties);
    }
//.....
}

可以得知通过ConfigFactory工厂类创建ConfigService,继续查看,通过反射创建实例

public class ConfigFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            //反射创建实例方法
            ConfigService vendorImpl = 
                (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(-400, e.getMessage());
        }
    }
}

通过反射形式创建了NacosConfigService实例。ConfigService是个接口,其构造如下

public interface ConfigService {
    //获取配置信息
    String getConfig(String dataId, String group, long timeoutMs) 
        throws NacosException;
    //添加配置更新的监听器
    void addListener(String dataId, String group, Listener listener) 
        throws NacosException;
   //发布配置信息
    boolean publishConfig(String dataId, String group, String content) 
        throws NacosException;
   //删除配置
    boolean removeConfig(String dataId, String group) 
        throws NacosException;
    //移除监听器
    void removeListener(String dataId, String group, Listener listener);
    String getServerStatus();
}

NacosConfigService实现如下:

public class NacosConfigService implements ConfigService {
    private final long POST_TIMEOUT = 3000L;
    private static final String EMPTY = "";
    private HttpAgent agent;
    private ClientWorker worker;// 负责执行长轮询任务
    private String namespace;
    private String encode;
    private ConfigFilterChainManager configFilterChainManager = 
        new ConfigFilterChainManager();
    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        //服务信息,用于鉴权、服务地址等的配置信息设置
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        //启动了ServerHttpAgent,作用是监听服务器列表是否变化。同步方法。具体查看阅读源码了解
        agent.start();
        //长轮询任务实例,监听配置更新的关键
        worker = new ClientWorker(agent, configFilterChainManager);
    }
    //...
}

ClientWorker源码

    public ClientWorker(final HttpAgent agent, 
                        final ConfigFilterChainManager configFilterChainManager) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executorService = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    //检查配置是否变化,通过cacheData(linstener)记录配置文件md5信息比较
                    checkConfigInfo();
                } catch (Throwable e) {
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);//每10毫秒执行一次
    }
    public void checkConfigInfo() {
        // cacheMap保存的 groupKey -> cacheData。groupkey以dataId+group定义
        int listenerSize = cacheMap.get().size();
        int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                // 执行配置信息更新线程
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

LongPollingRunnable结构

    class LongPollingRunnable implements Runnable {
        private int taskId;
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
        public void run() {
            try {
                List<CacheData> cacheDatas = new ArrayList<CacheData>();
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            // 本地缓存检查
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                // 更新linstener记录的md5值
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                        }
                    }
                }
                List<String> inInitializingCacheList = new ArrayList<String>();
                //获取配置中心变化的配置信息,对inInitializingCacheList初始化
                List<String> changedGroupKeys = 
                    checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                // 配置发生变化则获取服务器最新配置
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //获取服务器配置信息
              String content = getServerConfig(dataId, group, tenant, 3000L);
              CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);//设置listener变化配置内容
                    } catch (NacosException ioe) {
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        //判断监听器md5是否相同,不同更新最新md5  
                        //有变化这发起listener的receiveConfigInfo通知
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
            } catch (Throwable e) {
            } finally {
                //再次执行任务,实现循环
                executorService.execute(this);
            }
        }
    }

checkUpdateDataIds代码中调用 checkUpdateConfigStr


    /**
     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
     */
//probeUpdateString以dataId group md5 tenant命名
List<String> checkUpdateConfigStr(String probeUpdateString, 
                                  boolean isInitializingCacheList) {
    List<String> params = 
        Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    long timeout = TimeUnit.SECONDS.toMillis(30L);// 30秒超时
    List<String> headers = new ArrayList<String>(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    // 初始化时直接返回不等待,具体看服务端实现
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    try {
      //调用nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener 获取服务器配置更新信息
        HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
            agent.getEncode(), timeout);
        if (HttpURLConnection.HTTP_OK == result.code) {
            setHealthServer(true);
            // 返回有跟新的groupkey
            return parseUpdateDataIdResponse(result.content);
        } else {
            setHealthServer(false);
        }
    } catch (IOException e) {
    }
    return Collections.emptyList();
}

checkLocalConfig方法

    private void checkLocalConfig(CacheData cacheData) {
        final String dataId = cacheData.dataId;
        final String group = cacheData.group;
        final String tenant = cacheData.tenant;
//获取本地缓存的文件   
//LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data"下是否有缓存;
        File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
        // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            return;
        }
        // 有变更
        if (cacheData.isUseLocalConfigInfo() && path.exists()
            && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
    }

服务端代码导读

API接口/nacos/v1/cs/configs/listener,接口代码

    @RequestMapping(value = "/listener", method = RequestMethod.POST)
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        // client传过来的groupkey
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        // do long-polling
        //inner为ConfigServletInner
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

ConfigServletInner实现

public class ConfigServletInner {
    @Autowired
    private LongPollingService longPollingService;
    @Autowired
    private PersistService persistService;
    private static final int TRY_GET_LOCK_TIMES = 9;
    private static final int START_LONGPOLLING_VERSION_NUM = 204;
    /**
     * 轮询接口
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize)
        throws IOException, ServletException {
        // 长轮询
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
       // ...省略短轮询代码
        return HttpServletResponse.SC_OK + "";
    }

longPollingService.addLongPollingClient

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);
       // 延迟30s-delayTime执行,再看ClientLongPolling代码
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

ClientLongPolling代码

ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
    this.asyncContext = ac;
    this.clientMd5Map = clientMd5Map;
    this.probeRequestSize = probeRequestSize;
    this.createTime = System.currentTimeMillis();
    this.ip = ip;
    this.timeoutTime = timeoutTime;
    this.appName = appName;
    this.tag = tag;
}        
/**
*其中final Queue<ClientLongPolling> allSubs; 是一个队列
* 这个队列维持客户端的请求任务,当调用配置修改api时会触发入队
* 
*/
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        public void run() {
            try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                //删除订阅关系
                allSubs.remove(ClientLongPolling.this);
                if (isFixedPolling()) {
       // 通过比较md5值获取配置变更groupkey
       List<String> changedGroups = MD5Util.compareMd5(
       (HttpServletRequest)asyncContext.getRequest(),
       (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    sendResponse(null);
                }
            } catch (Throwable t) {
            }
        }
    }, timeoutTime, TimeUnit.MILLISECONDS);
    allSubs.add(this);
}

配置修改API

    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(...)
        throws NacosException {
       //...
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                // 关键查看这个方法
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

EventDispatcher.fireEvent

    static public void fireEvent(Event event) {
        if (null == event) {
            throw new IllegalArgumentException();
        }
//从linstener中获取变更事件触发实例
        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }
    }

而listeners维护在Entry中,Entry重LISTENER_HUB中获取,而LISTENER_HUB则通过addEventListener添加linstener

// AbstractEventListener的构造方法会触发   EventDispatcher.addEventListener。 因此查看
//AbstractEventListener的实现
static public abstract class AbstractEventListener {
    public AbstractEventListener() {
        EventDispatcher.addEventListener(this);
    }
}

查看LongPollingService,当客户端添加了linstener时allSub维护的订阅关系,而配置变更时会触发onEvent事件,onEvent执行DataChangeTask任务。

    @Override
    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }

DataChangeTask 实现

        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    //找到客户端订阅的groupkey任务
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // ...
                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); // 删除订阅关系
                        //通过长轮询,返回变更消息通知客户端
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
            }
        }

相关文章

网友评论

    本文标题:SpringCloud Alibaba Nacos 配置中心源码

    本文链接:https://www.haomeiwen.com/subject/tcrmoqtx.html