美文网首页
客户端分析

客户端分析

作者: Spring_java | 来源:发表于2021-09-02 18:02 被阅读0次

1: 客户端获取配置

Nacos 服务端保存了配置信息,客户端连接到服务端之后,根据 dataID,group可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会收到通知。
参考文章:
链接:https://www.jianshu.com/p/38b5452c9fec
https://blog.csdn.net/wangwei19871103/article/details/105762802?spm=1001.2014.3001.5502

客户端注册监听 服务端 的本质 就是带着配置和配置值的 MD5 值和后台对比。如果 MD5 值不一致,就立即返回不一致的配置。如果值一致,就等待住 30 秒。返回值为空。

1.1 看看Nacos是如何获取服务端的最新数据

主要是通过NacosConfigService 来实现 及时获取服务端的最新数据。构造函数里面构建了HttpAgent和
ClientWorker。

    public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        this.init(properties);
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        this.executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    ClientWorker.this.checkConfigInfo();
                } catch (Throwable var2) {
                    ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);
                }

            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:

  • 第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。
  • 第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。

长轮训任务 LongPollingRunnable的run

首先会遍历所有的CacheData,找出是当前任务的加入到一个集合里,如果是本次任务的就先检查本地配置,如果有改变的话就要通知监听器。如果是有在初始化的CacheData,那服务器就会立即返回,否则会被挂起,这个原因就是为了不进行频繁的空轮询,又能实现动态配置,只要在挂起的时间段内有改变,就可以理解响应给客户端。获取完之后再检查有没改变,有的话也要通知,然后继续调度当前任务。
接下来去请求服务器获取配置信息。

 @Override
        public void run() {
            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            //是否是在初始化的CacheData,会影响服务器是否挂起或者立即返回
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {//属于当前长轮询任务的
                        cacheDatas.add(cacheData);
                        try {
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {//用本地配置
                                cacheData.checkListenerMd5();//有改变的话会通知
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }
                //获取有变化的配置列表dataid+group,访问的url是/listener
                // check server config
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {//有更新的就获取一次配置
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(ct[0]);//设置配置内容
                        if (null != ct[1]) {
                            cache.setType(ct[1]);//设置配置类型
                        }
                       ...
                    } catch (NacosException ioe) {
                        ...
                    }
                }
                for (CacheData cacheData : cacheDatas) {//不是初始化中的或者初始化集合里存在的
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        cacheData.checkListenerMd5();//检查是否有变化,有变化就通知
                        cacheData.setInitializing(false);//请求过了后就设置为不在初始化中,这样就会被挂起,如果服务器配置有更新,就会立即返回,这样就可以实现动态配置更新,又不会太多的空轮询消耗
                    }
                }
                inInitializingCacheList.clear();
                executorService.execute(this);
            } catch (Throwable e) {
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

  • checkUpdateDataIds获取有改变的dataId 列表,
  • 通过 getServerConfig 方法,根据 dataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。
CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
 cache.setContent(content);
  • 当服务端配置信息发生变更时,客户端将最新的数据获取下来之后,保存在了 CacheData 中,同时更新了该 CacheData 的 md5 值,所以当下次执行 checkListenerMd5 方法时,就会发现当前 listener 所持有的 md5 值已经和 CacheData 的 md5 值不一样了,也就意味着服务端的配置信息发生改变了,这时就需要将最新的数据通知给 Listener 的持有者。

checkListenerMd5 检查MD5值

    void checkListenerMd5() {
        Iterator var1 = this.listeners.iterator();
        while(var1.hasNext()) {
            ManagerListenerWrap wrap = (ManagerListenerWrap)var1.next();
            if (!this.md5.equals(wrap.lastCallMd5)) {
                this.safeNotifyListener(this.dataId, this.group, this.content, this.md5, wrap);
            }
        }
    }

该方法会检查 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就执行一个安全的监听器的通知方法:safeNotifyListener,通知什么呢?我们可以大胆的猜一下,应该是通知 Listener 的使用者,该 Listener 所关注的配置信息已经发生改变了。

总结:
Nacos 服务端创建了相关的配置项后,客户端就可以进行监听了。
客户端是通过一个定时任务来检查自己监听的配置项的数据的,一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。
考虑到服务端故障的问题,客户端将最新数据获取后会保存在本地的 snapshot 文件中,以后会优先从文件中获取配置信息的值。

相关文章

网友评论

      本文标题:客户端分析

      本文链接:https://www.haomeiwen.com/subject/ynhcwltx.html