美文网首页
xxl-系列

xxl-系列

作者: 93张先生 | 来源:发表于2019-11-21 21:43 被阅读0次

xxl-register

register-client

XxlRegisteryBaseClient

功能:

提供最简单的register remove discovery monitor方法,向注册中心发送请求并返回结果.

XxlRegisteryClient

属性:

1.registeryData (HashSet):要注册的数据;
2.discoveryData (ConcurrentHashMap<String,TreeSet<<String>>) :需要发现的数据;

后台线程:

1.registeryThread:一个注册的后台线程:向注册中心注册数据,每隔10秒钟注册一次;通过Thread.sleep()实现;
2.discoveryThread:一个发现的后台线程:没有要查找的数据,sleep(3 second);有要查找的数据,监控注册中心数据;然后sleep(10 second);刷新注册的数据;通过已经缓存数据的集合对比,没有改变不需要更新;

功能:

regisitery:注册数据加入缓存,调用基础的注册方法(registryBaseClient.registry());
remove:移除数据移除缓存,调用基础的移除方法(registryBaseClient.remove());
discovery:先从缓存查询注册的数据,数据不一致,然后去刷新注册中心的数据,根据更新的数据,刷新本地缓存;

// registry thread
registryThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (!registryThreadStop) {
            try {
                if (registryData.size() > 0) {

                    boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
                    logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
                }
            } catch (Exception e) {
                if (!registryThreadStop) {
                    logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                }
            }
            try {
                // per 10 seconds register once
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                if (!registryThreadStop) {
                    logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
    }
});
 // discovery thread
    discoveryThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!registryThreadStop) {

                if (discoveryData.size() == 0) {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                        }
                    }
                } else {
                    try {
                        // monitor
                        boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());

                        // avoid fail-retry request too quick
                        if (!monitorRet){
                            TimeUnit.SECONDS.sleep(10);
                        }

                        // refreshDiscoveryData, all
                        refreshDiscoveryData(discoveryData.keySet());
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                        }
                    }
                }

            }
            logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
        }
    });

/**
 * refreshDiscoveryData, some or all
 */
private void refreshDiscoveryData(Set<String> keys){
    if (keys==null || keys.size() == 0) {
        return;
    }

    // discovery mult
    Map<String, TreeSet<String>> updatedData = new HashMap<>();

    Map<String, TreeSet<String>> keyValueListData = registryBaseClient.discovery(keys);
    if (keyValueListData!=null) {
        for (String keyItem: keyValueListData.keySet()) {

            // list > set
            TreeSet<String> valueSet = new TreeSet<>();
            valueSet.addAll(keyValueListData.get(keyItem));

            // valid if updated
            boolean updated = true;
            TreeSet<String> oldValSet = discoveryData.get(keyItem);
            if (oldValSet!=null && BasicJson.toJson(oldValSet).equals(BasicJson.toJson(valueSet))) {
                updated = false;
            }

            // set
            if (updated) {
                discoveryData.put(keyItem, valueSet);
                updatedData.put(keyItem, valueSet);
            }

        }
    }

    if (updatedData.size() > 0) {
        logger.info(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData(updated) = {}", updatedData);
    }
    logger.debug(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData = {}", discoveryData);
}

register-admin

register

服务注册 & 续约 API

说明:新服务注册上线1s内广播通知接入方;需要接入方循环续约,否则服务将会过期(三倍于注册中心心跳时间)下线;
将消息实体放入LinkedBlockingQueue registerQueue;

remove

服务摘除 API
说明:新服务摘除下线1s内广播通知接入方;
将消息实体放入LinkedBlockingQueue registerQueue;

discovery

说明:查询在线服务地址列表;
通过磁盘文件发现注册服务

monitor

说明:long-polling 接口,主动阻塞一段时间(三倍于注册中心心跳时间);直至阻塞超时或服务注册信息变动时响应;

token使用:

每次请求(register remove discovery ...)都携带token;服务端token和客户端token进行比对;查看是否一致合法;

if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
客户端和服务端心跳频率:10s;

信息注册:

服务端通过广播机制实时同步服务注册信息向客户端;客户端连接服务端的long polling技术: 新注册和移除的服务 1 秒内通知客户端;客户端和服务端实时通信,进行续约;
文件和数据库内容的一致性:先处理数据库,然后处理磁盘文件;先更新数据库,然后写更新事件到registery message,最后通过广播线程处理registery message;

discovery:只从文件读取;一致性都是按照磁盘文件为准;
registry():加入registryQueue;
remove():加入removeQueue;
discovery():只从文件获取注册数据信息;
monitor():

返回DeferredResult;他是一个返回延迟结果的对象;它的结果在brocadcast Thread中处理;有结果或者超时自动返回结果.从registryDeferredResultMap中获取list,每一个key,有一个List<DeferredResult>> list;逐个处理list中的DeferredResult对象.
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();

// brocast monitor client
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList != null) {
    registryDeferredResultMap.remove(fileName);
    for (DeferredResult deferredResult: deferredResultList) {
        deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
    }
}
10个注册线程:

加入registryQueue;先更新registery data,然后更新registery,最后添加registery message;

10个移除线程:

加入removeQueue;先删除registery data,然后更新registery,最后添加registery message;

一个广播线程:

广播线程从registeryMessage表获取事件(registry,remove....)信息,然后同步注册器磁盘文件和处理客户端监控结果,并返回客户端;客户端结果,两种情况;1. 成功;2. 3个心跳时间(10s)超时返回;

一个旧数据清楚线程:

清除registery data 10 * 3 时间之前的;
然后更新registery数据库;
然后更新registery磁盘文件;
最后删除磁盘文件;

集群信息共享:
通过同一个数据库实例实现;

相关文章

  • xxl-系列

    xxl-register register-client XxlRegisteryBaseClient 功能: 提...

  • 亲手做过的美食们

    早餐系列一 系列二 午餐系列一 系列二 系列三....

  • 包包

    小清新学院风系列 古风系列 黑色系列 粉色系列 白色系列 可爱系列 学院风系列 双肩包系列

  • 2018我的彩铅

    多肉植物系列 美食系列 动物系列 美发系列 人物系列 其他

  • 有趣的图

    浣熊系列 猫系列 小猫与纸窗帘系列 迷人的狗狗系列 人物系列 仙女系列 娃娃与鸽子树系列 陌生的动物系列 南风包包...

  • 文章系列初稿

    育儿系列 家庭主妇系列 职场观察系列 小说系列 生活感悟情感系列 美图系列 读书读后感及总结系列 婆媳关系系列 专...

  • 鞋子2

    黑色系列 粉色系列 卡其色系列 白色系列 黑色系列 杏色系列 其他色系

  • 写作自检清单

    博客搭建系列Markdown教程Git教程系列基础教学系列linux 命令教学系列配置系列太基础教程系列广告系列诸...

  • 淑女系列

    冬装系列 毛呢大衣系列 短外套系列,秋 夏装系列

  • linux符号整理-正则整理

    引号符号系列: 重定向符号系列: 特殊符号系列: 通配符系列匹配文件内容信息 正则符号系列: 扩展正则系列:

网友评论

      本文标题:xxl-系列

      本文链接:https://www.haomeiwen.com/subject/duhnictx.html