下面来分析一下Dubbo的注册中心。说起Dubbo的注册中心,大家最先想到的就是Zookeeper了,但是Dubbo不是只有Zookeeper这一个注册中心,Redis也可以作为Dubbo的注册中心。下面我们就来分析一下Dubbo的注册中心是怎么实现的。
先来看一下ZookeeperRegistry这个类
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获得 Zookeeper 根节点
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper Client
zkClient = zookeeperTransporter.connect(url);
// 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
看下doSubscribe方法
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 处理所有 Service 层的发起订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 获得 url 对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) { // 不存在,进行创建
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
// 新增 Service 接口全名时(即新增服务),发起该 Service 层的订阅
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
// 创建 Service 节点。该节点为持久节点。
zkClient.create(root, false);
// 向 Zookeeper ,Service 节点,发起订阅
List<String> services = zkClient.addChildListener(root, zkListener);
// 首次全量数据获取完成时,循环 Service 接口全名数组,发起该 Service 层的订阅
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 处理指定 Service 层的发起订阅,例如服务消费者的订阅
} else {
// 子节点数据数组
List<URL> urls = new ArrayList<URL>();
// 循环分类数组
for (String path : toCategoriesPath(url)) {
// 获得 url 对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) { // 不存在,进行创建
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
// 变更时,调用 `notify` 方法,回调 NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
// 创建 Type 节点。该节点为持久节点。
zkClient.create(path, false);
// 向 Zookeeper ,PATH 节点,发起订阅
List<String> children = zkClient.addChildListener(path, zkListener);
// 添加到 `urls` 中
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次全量数据获取完成时,调用 `notify` 方法,回调 NotifyListener
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
ZookeeperRegistry 构造函数中为zookeeper的操作客户端添加了一个状态监听器 StateListener,当重新连接时( 重新连接意味着之前连接断开了 ),将已经注册和订阅的URL添加到失败集合中,定时重试,也就是重新注册和订阅。
zookeeper Client与Server断开连接后,会定时的不断尝试重新连接,当连接成功后就会触发一个Event,Dubbo注册了CONNECTED状态的监听器,当连接成功后重新注册和订阅。
再来看一下FailbackRegistry的retry方法
protected void retry() {
// 重试执行注册
if (!failedRegistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedRegistered); // 避免并发冲突
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
for (URL url : failed) {
try {
// 执行注册
doRegister(url);
// 移除出 `failedRegistered`
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行取消注册
if (!failedUnregistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedUnregistered); // 避免并发冲突
if (!failed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Retry unregister " + failed);
}
try {
for (URL url : failed) {
try {
// 执行取消注册
doUnregister(url);
// 移除出 `failedUnregistered`
failedUnregistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行注册
if (!failedSubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); // 避免并发冲突
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry subscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
// 执行注册
doSubscribe(url, listener);
// 移除出监听器
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行取消注册
if (!failedUnsubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry unsubscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
// 执行取消注册
doUnsubscribe(url, listener);
// 移除出监听器
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行通知监听器
if (!failedNotified.isEmpty()) {
Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry notify " + failed);
}
try {
for (Map<NotifyListener, List<URL>> values : failed.values()) {
for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
try {
NotifyListener listener = entry.getKey();
List<URL> urls = entry.getValue();
// 通知监听器
listener.notify(urls);
// 移除出监听器
values.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
在retry方法中,会重试之前执行失败的动作。
再来看一下RegistryDirectory的refreshInvoker方法
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 设置禁止访问
this.forbidden = true; // Forbid to access
// methodInvokerMap 置空
this.methodInvokerMap = null; // Set the method invoker map to null
// 销毁所有 Invoker 集合
destroyAllInvokers(); // Close all invokers
} else {
// 设置允许访问
this.forbidden = false; // Allow to access
// 引用老的 urlInvokerMap
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
// 传入的 invokerUrls 为空,说明是路由规则或配置规则发生改变,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
// 传入的 invokerUrls 非空,更新 cachedInvokerUrls 。
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls); //Cached invoker urls, convenient for comparison //缓存invokerUrls列表,便于交叉对比
}
// 忽略,若无 invokerUrls
if (invokerUrls.isEmpty()) {
return;
}
// 将传入的 invokerUrls ,转成新的 urlInvokerMap
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 转换出新的 methodInvokerMap
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed. 如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
// 销毁不再使用的 Invoker 集合
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
更新Dubbo内的Invoker相关数据,保证Consumer能实时感知到Provider的信息,保证rpc调用不会出错。
Dubbo的注册中心就分析到这里了。
网友评论