美文网首页
xxl-系列

xxl-系列

作者: 93张先生 | 来源:发表于2019-11-21 21:43 被阅读0次

    xxl-register

    register-client

    XxlRegisteryBaseClient

    功能:

    提供最简单的register remove discovery monitor方法,向注册中心发送请求并返回结果.

    XxlRegisteryClient

    属性:

    1.registeryData (HashSet):要注册的数据;
    2.discoveryData (ConcurrentHashMap<String,TreeSet<<String>>) :需要发现的数据;

    后台线程:

    1.registeryThread:一个注册的后台线程:向注册中心注册数据,每隔10秒钟注册一次;通过Thread.sleep()实现;
    2.discoveryThread:一个发现的后台线程:没有要查找的数据,sleep(3 second);有要查找的数据,监控注册中心数据;然后sleep(10 second);刷新注册的数据;通过已经缓存数据的集合对比,没有改变不需要更新;

    功能:

    regisitery:注册数据加入缓存,调用基础的注册方法(registryBaseClient.registry());
    remove:移除数据移除缓存,调用基础的移除方法(registryBaseClient.remove());
    discovery:先从缓存查询注册的数据,数据不一致,然后去刷新注册中心的数据,根据更新的数据,刷新本地缓存;

    // registry thread
    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!registryThreadStop) {
                try {
                    if (registryData.size() > 0) {
    
                        boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
                        logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
                    }
                } catch (Exception e) {
                    if (!registryThreadStop) {
                        logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                    }
                }
                try {
                    // per 10 seconds register once
                    TimeUnit.SECONDS.sleep(10);
                } catch (Exception e) {
                    if (!registryThreadStop) {
                        logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
        }
    });
    
     // discovery thread
        discoveryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!registryThreadStop) {
    
                    if (discoveryData.size() == 0) {
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (Exception e) {
                            if (!registryThreadStop) {
                                logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                            }
                        }
                    } else {
                        try {
                            // monitor
                            boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());
    
                            // avoid fail-retry request too quick
                            if (!monitorRet){
                                TimeUnit.SECONDS.sleep(10);
                            }
    
                            // refreshDiscoveryData, all
                            refreshDiscoveryData(discoveryData.keySet());
                        } catch (Exception e) {
                            if (!registryThreadStop) {
                                logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                            }
                        }
                    }
    
                }
                logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
            }
        });
    
    
    /**
     * refreshDiscoveryData, some or all
     */
    private void refreshDiscoveryData(Set<String> keys){
        if (keys==null || keys.size() == 0) {
            return;
        }
    
        // discovery mult
        Map<String, TreeSet<String>> updatedData = new HashMap<>();
    
        Map<String, TreeSet<String>> keyValueListData = registryBaseClient.discovery(keys);
        if (keyValueListData!=null) {
            for (String keyItem: keyValueListData.keySet()) {
    
                // list > set
                TreeSet<String> valueSet = new TreeSet<>();
                valueSet.addAll(keyValueListData.get(keyItem));
    
                // valid if updated
                boolean updated = true;
                TreeSet<String> oldValSet = discoveryData.get(keyItem);
                if (oldValSet!=null && BasicJson.toJson(oldValSet).equals(BasicJson.toJson(valueSet))) {
                    updated = false;
                }
    
                // set
                if (updated) {
                    discoveryData.put(keyItem, valueSet);
                    updatedData.put(keyItem, valueSet);
                }
    
            }
        }
    
        if (updatedData.size() > 0) {
            logger.info(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData(updated) = {}", updatedData);
        }
        logger.debug(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData = {}", discoveryData);
    }
    

    register-admin

    register

    服务注册 & 续约 API

    说明:新服务注册上线1s内广播通知接入方;需要接入方循环续约,否则服务将会过期(三倍于注册中心心跳时间)下线;
    将消息实体放入LinkedBlockingQueue registerQueue;

    remove

    服务摘除 API
    说明:新服务摘除下线1s内广播通知接入方;
    将消息实体放入LinkedBlockingQueue registerQueue;

    discovery

    说明:查询在线服务地址列表;
    通过磁盘文件发现注册服务

    monitor

    说明:long-polling 接口,主动阻塞一段时间(三倍于注册中心心跳时间);直至阻塞超时或服务注册信息变动时响应;

    token使用:

    每次请求(register remove discovery ...)都携带token;服务端token和客户端token进行比对;查看是否一致合法;

    if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
    }
    
    客户端和服务端心跳频率:10s;

    信息注册:

    服务端通过广播机制实时同步服务注册信息向客户端;客户端连接服务端的long polling技术: 新注册和移除的服务 1 秒内通知客户端;客户端和服务端实时通信,进行续约;
    文件和数据库内容的一致性:先处理数据库,然后处理磁盘文件;先更新数据库,然后写更新事件到registery message,最后通过广播线程处理registery message;

    discovery:只从文件读取;一致性都是按照磁盘文件为准;
    registry():加入registryQueue;
    remove():加入removeQueue;
    discovery():只从文件获取注册数据信息;
    monitor():

    返回DeferredResult;他是一个返回延迟结果的对象;它的结果在brocadcast Thread中处理;有结果或者超时自动返回结果.从registryDeferredResultMap中获取list,每一个key,有一个List<DeferredResult>> list;逐个处理list中的DeferredResult对象.
    private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();

    // brocast monitor client
    List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
    if (deferredResultList != null) {
        registryDeferredResultMap.remove(fileName);
        for (DeferredResult deferredResult: deferredResultList) {
            deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
        }
    }
    
    10个注册线程:

    加入registryQueue;先更新registery data,然后更新registery,最后添加registery message;

    10个移除线程:

    加入removeQueue;先删除registery data,然后更新registery,最后添加registery message;

    一个广播线程:

    广播线程从registeryMessage表获取事件(registry,remove....)信息,然后同步注册器磁盘文件和处理客户端监控结果,并返回客户端;客户端结果,两种情况;1. 成功;2. 3个心跳时间(10s)超时返回;

    一个旧数据清楚线程:

    清除registery data 10 * 3 时间之前的;
    然后更新registery数据库;
    然后更新registery磁盘文件;
    最后删除磁盘文件;

    集群信息共享:
    通过同一个数据库实例实现;

    相关文章

      网友评论

          本文标题:xxl-系列

          本文链接:https://www.haomeiwen.com/subject/duhnictx.html