美文网首页
nacos源码3-服务管理-客户端

nacos源码3-服务管理-客户端

作者: modou1618 | 来源:发表于2019-01-13 23:27 被阅读0次

    一 入口

    1.1 NamingFactory工厂类

    • 获取类构造函数
    • 通过反射创建服务管理客户端
    public class NamingFactory {
    
        public static NamingService createNamingService(String serverList) throws NacosException {
            try {
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
                Constructor constructor = driverImplClass.getConstructor(String.class);
                NamingService vendorImpl = (NamingService)constructor.newInstance(serverList);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(-400, e.getMessage());
            }
        }
    
        public static NamingService createNamingService(Properties properties) throws NacosException {
            try {
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
                Constructor constructor = driverImplClass.getConstructor(Properties.class);
                NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(-400, e.getMessage());
            }
        }
    }
    

    1.2 NacosNamingService

    NacosNamingService.png
    • namespace 名字空间
    • endpoint 服务管理服务端地址管理服务器地址,获取服务管理服务端地址
    • serverList 服务管理服务端地址,可直接配置,或从endpoint获取
    • cacheDir 调用服务信息本地文件缓存地址
    • logName 暂未使用
    • HostReactor 客户端关心的服务的实例信息,推拉模式的更新,failover服务实例信息读写管理
    • BeatReactor 本地实例信息心跳
    • EventDispatcher 服务信息变更监听回调处理
    • NamingProxy 服务管理服务端地址列表更新管理,接口调用负载均衡,失败重试

    二 NamingProxy

    2.1 HttpClient

    • NamingProxy通过HttpClient发送http请求
    • 基于HttpURLConnection封装http请求发送
    • 按encoding编码请求参数,编码类型作为第一个参数。格式类似于encoding=encodingType&k1=encoding(v1)&k2=enconding(v2),
    String encodedContent = encodingParams(paramValues, encoding);
    private static String encodingParams(Map<String, String> params, String encoding)
            throws UnsupportedEncodingException {
        StringBuilder sb = new StringBuilder();
        if (null == params || params.isEmpty()) {
            return null;
        }
    
        params.put("encoding", encoding);
    
        for (Map.Entry<String, String> entry : params.entrySet()) {
            if (StringUtils.isEmpty(entry.getValue())) {
                continue;
            }
    
            sb.append(entry.getKey()).append("=");
            sb.append(URLEncoder.encode(entry.getValue(), encoding));
            sb.append("&");
        }
    
        return sb.toString();
    }
    
    • 按照get方式组装请求url。
      url += (null == encodedContent) ? "" : ("?" + encodedContent);
    • 初始化HttpURLConnection.设置连接超时3s,读写超时50s,请求方法,请求头
    conn = (HttpURLConnection) new URL(url).openConnection();
    conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
    conn.setReadTimeout(TIME_OUT_MILLIS);
    conn.setRequestMethod(method);
    setHeaders(conn, headers, encoding);
    
    • 设置请求头
    private static void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
        if (null != headers) {
            for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
                conn.addRequestProperty(iter.next(), iter.next());
            }
        }
    
        conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset="
                + encoding);
        conn.addRequestProperty("Accept-Charset", encoding);
    }
    
    • 和服务端建立连接 ,发送请求
      conn.connect();
    • 获取请求响应结果
      响应码,响应头,响应结果数据
    return getResult(conn);
    
    private static HttpResult getResult(HttpURLConnection conn) throws IOException {
        int respCode = conn.getResponseCode();
    
        InputStream inputStream;
        if (HttpURLConnection.HTTP_OK == respCode
                || HttpURLConnection.HTTP_NOT_MODIFIED == respCode) {
            inputStream = conn.getInputStream();
        } else {
            inputStream = conn.getErrorStream();
        }
    
        Map<String, String> respHeaders = new HashMap<String, String>(conn.getHeaderFields().size());
        for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
            respHeaders.put(entry.getKey(), entry.getValue().get(0));
        }
    
        String encodingGzip = "gzip";
    
        if (encodingGzip.equals(respHeaders.get(HttpHeaders.CONTENT_ENCODING))) {
            inputStream = new GZIPInputStream(inputStream);
        }
    
        return new HttpResult(respCode, IoUtils.toString(inputStream, getCharset(conn)), respHeaders);
    }
    

    2.2 NamingProxy

    2.2.1 服务管理服务端地址管理

    • 配置的服务端地址列表serverList优先于地址服务器获取的服务端地址列表
    • 启动30s周期任务,定时从地址服务器获取服务端地址列表

    2.2.2 接口

    • 注册/注销服务实例接口
    • 服务实例列表查询接口
    • 服务端健康状态检测接口
    • 服务列表接口
    • reqAPI调用callServer发送请求。失败则通过遍历服务管理服务端地址方式依次失败重试服务端直到成功,
    • callServer封装HttpClient接口,向指定服务管理服务端发送请求

    三 BeatReactor

    • ConcurrentMap<String, BeatInfo> dom2Beat 本地缓存注册的服务实例信息
    • 定时周期5s任务,遍历dom2Beat,执行服务实例心跳发送任务BeatTask线程。向/api/clientBeat发送服务BeatInfo信息
    • BeatInfo结构,服务实例的ip,端口,权重,集群,服务名或域名,实例元数据信息。
    public class BeatInfo {
        private int port;
        private String ip;
        private double weight;
        private String dom;
        private String cluster;
        private Map<String, String> metadata;
    ...
    }
    
    • 入口registerInstance/deregisterInstance接口调用时添加服务实例信息

    四 EventDispatcher

    • ConcurrentMap<String, List<EventListener>> observerMap服务变更监听
    • BlockingQueue<ServiceInfo> changedServices生产者/消费者方式通知服务信息变更
    • 入口subscribe/unsubscribe接口注册或删除服务变更监听函数
    • 服务变更生产者,首次注册监听函数时调用,或HostReactor中获取到服务变更时调用。
    public void serviceChanged(ServiceInfo serviceInfo) {
        if (serviceInfo == null) {
            return;
        }
    
        changedServices.add(serviceInfo);
    }
    
    • 服务变更消费者
      变更服务监听列表,触发监听回调
    List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
    
    if (!CollectionUtils.isEmpty(listeners)) {
        for (EventListener listener : listeners) {
            List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
            listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
        }
    }
    

    五 HostReactor

    5.1 FailoverReactor

    • 手动配置的服务信息及开关,开启开关后failover服务信息优先级最高。failover文件名为服务名,文件内容为服务实例信息
    • ConcurrentMap<String, ServiceInfo> serviceMap 保存failover文件配置的服务信息
    • 5s周期任务SwitchRefresher,动态从本地配置中获取failover配置使用开关,默认关闭failover。
      failover开启,则读取failover服务文件,更新serviceMap
    • 24h周期任务DiskFileWriter,保存HostReactor.serviceInfoMap服务实例信息到failover文件中,后续修改可基于现有数据修改。
    • 初始化时触发一个异步线程,等待10s后调用DiskFileWriter.run()保存HostReactor.serviceInfoMap到failover文件中。

    5.2 PushRecver

    • 推模式,基于udp传输更新服务实例信息
    • 监听udp端口,获取DatagramPacket udp报文,支持gzip方式压缩udp数据PushPacket.
    • PushPacket类型
    类型 功能
    dom 更新客户端存储的服务实例信息,触发变更监听回调
    dump json格式转换hostReactor.serviceInfoMap整体服务实例信息,回传给服务管理服务端。

    5.3 服务实例管理

    服务实例获取流程.png

    5.3.1 服务实例拉模式

    • UpdataTask定时更新服务实例信息


      UpdataTask.png

    六 Balancer

    • 基于实例权重随机选择服务实例
    • vipChooser.refresh(hostsWithWeight); 更新权重信息。
    public class Ref<T> {
            private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();//带权重原始数据
            private List<T> items = new ArrayList<T>();//实例数组
            private double[] weights;//对应索引实例的权重+所有之前索引实例的权重。
    }
    
    • Instance host = vipChooser.randomWithWeight(); 基于权重随机选择。
      获取随机值,对比Ref.weights中相等的对应项,即为目标实例索引。

    相关文章

      网友评论

          本文标题:nacos源码3-服务管理-客户端

          本文链接:https://www.haomeiwen.com/subject/xghrdqtx.html