xxl-register
register-client
XxlRegisteryBaseClient
功能:
提供最简单的register remove discovery monitor方法,向注册中心发送请求并返回结果.
XxlRegisteryClient
属性:
1.registeryData (HashSet):要注册的数据;
2.discoveryData (ConcurrentHashMap<String,TreeSet<<String>>) :需要发现的数据;
后台线程:
1.registeryThread:一个注册的后台线程:向注册中心注册数据,每隔10秒钟注册一次;通过Thread.sleep()实现;
2.discoveryThread:一个发现的后台线程:没有要查找的数据,sleep(3 second);有要查找的数据,监控注册中心数据;然后sleep(10 second);刷新注册的数据;通过已经缓存数据的集合对比,没有改变不需要更新;
功能:
regisitery:注册数据加入缓存,调用基础的注册方法(registryBaseClient.registry());
remove:移除数据移除缓存,调用基础的移除方法(registryBaseClient.remove());
discovery:先从缓存查询注册的数据,数据不一致,然后去刷新注册中心的数据,根据更新的数据,刷新本地缓存;
// registry thread
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
try {
if (registryData.size() > 0) {
boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
}
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
}
}
try {
// per 10 seconds register once
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
}
});
// discovery thread
discoveryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
if (discoveryData.size() == 0) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
}
}
} else {
try {
// monitor
boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());
// avoid fail-retry request too quick
if (!monitorRet){
TimeUnit.SECONDS.sleep(10);
}
// refreshDiscoveryData, all
refreshDiscoveryData(discoveryData.keySet());
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
}
});
/**
* refreshDiscoveryData, some or all
*/
private void refreshDiscoveryData(Set<String> keys){
if (keys==null || keys.size() == 0) {
return;
}
// discovery mult
Map<String, TreeSet<String>> updatedData = new HashMap<>();
Map<String, TreeSet<String>> keyValueListData = registryBaseClient.discovery(keys);
if (keyValueListData!=null) {
for (String keyItem: keyValueListData.keySet()) {
// list > set
TreeSet<String> valueSet = new TreeSet<>();
valueSet.addAll(keyValueListData.get(keyItem));
// valid if updated
boolean updated = true;
TreeSet<String> oldValSet = discoveryData.get(keyItem);
if (oldValSet!=null && BasicJson.toJson(oldValSet).equals(BasicJson.toJson(valueSet))) {
updated = false;
}
// set
if (updated) {
discoveryData.put(keyItem, valueSet);
updatedData.put(keyItem, valueSet);
}
}
}
if (updatedData.size() > 0) {
logger.info(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData(updated) = {}", updatedData);
}
logger.debug(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData = {}", discoveryData);
}
register-admin
register
服务注册 & 续约 API
说明:新服务注册上线1s内广播通知接入方;需要接入方循环续约,否则服务将会过期(三倍于注册中心心跳时间)下线;
将消息实体放入LinkedBlockingQueue registerQueue;
remove
服务摘除 API
说明:新服务摘除下线1s内广播通知接入方;
将消息实体放入LinkedBlockingQueue registerQueue;
discovery
说明:查询在线服务地址列表;
通过磁盘文件发现注册服务
monitor
说明:long-polling 接口,主动阻塞一段时间(三倍于注册中心心跳时间);直至阻塞超时或服务注册信息变动时响应;
token使用:
每次请求(register remove discovery ...)都携带token;服务端token和客户端token进行比对;查看是否一致合法;
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
客户端和服务端心跳频率:10s;
信息注册:
服务端通过广播机制实时同步服务注册信息向客户端;客户端连接服务端的long polling技术: 新注册和移除的服务 1 秒内通知客户端;客户端和服务端实时通信,进行续约;
文件和数据库内容的一致性:先处理数据库,然后处理磁盘文件;先更新数据库,然后写更新事件到registery message,最后通过广播线程处理registery message;
discovery:只从文件读取;一致性都是按照磁盘文件为准;
registry():加入registryQueue;
remove():加入removeQueue;
discovery():只从文件获取注册数据信息;
monitor():
返回DeferredResult;他是一个返回延迟结果的对象;它的结果在brocadcast Thread中处理;有结果或者超时自动返回结果.从registryDeferredResultMap中获取list,每一个key,有一个List<DeferredResult>> list;逐个处理list中的DeferredResult对象.
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();
// brocast monitor client
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList != null) {
registryDeferredResultMap.remove(fileName);
for (DeferredResult deferredResult: deferredResultList) {
deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
}
}
10个注册线程:
加入registryQueue;先更新registery data,然后更新registery,最后添加registery message;
10个移除线程:
加入removeQueue;先删除registery data,然后更新registery,最后添加registery message;
一个广播线程:
广播线程从registeryMessage表获取事件(registry,remove....)信息,然后同步注册器磁盘文件和处理客户端监控结果,并返回客户端;客户端结果,两种情况;1. 成功;2. 3个心跳时间(10s)超时返回;
一个旧数据清楚线程:
清除registery data 10 * 3 时间之前的;
然后更新registery数据库;
然后更新registery磁盘文件;
最后删除磁盘文件;
集群信息共享:
通过同一个数据库实例实现;
网友评论