美文网首页Dubbo
Dubbo——Registry服务注册

Dubbo——Registry服务注册

作者: 小波同学 | 来源:发表于2021-04-14 02:09 被阅读0次

    注册中心

    注册中心(Registry)在微服务架构中的作用举足轻重,有了它,服务提供者(Provider) 和消费者(Consumer) 就能感知彼此。

    Registry 只是 Consumer 和 Provider 感知彼此状态变化的一种便捷途径而已,它们彼此的实际通讯交互过程是直接进行的,对于 Registry 来说是透明无感的。Provider 状态发生变化了,会由 Registry 主动推送订阅了该 Provider 的所有 Consumer,这保证了 Consumer 感知 Provider 状态变化的及时性,也将和具体业务需求逻辑交互解耦,提升了系统的稳定性。

    Dubbo 中存在很多概念,但有些理解起来就特别费劲,如本文的 Registry,翻译过来的意思是“注册中心”,但它其实是应用本地的注册中心客户端,真正的“注册中心”服务是其他独立部署的进程,或进程组成的集群,比如 ZooKeeper 集群。本地的 Registry 通过和 ZooKeeper 等进行实时的信息同步,维持这些内容的一致性,从而实现了注册中心这个特性。另外,就 Registry 而言,Consumer 和 Provider 只是个用户视角的概念,它们被抽象为了一条 URL 。

    基础数据结构

    1、dubbo 还是通过SPI技术,根据参数URL来动态选择不同的注册中心。

    @SPI("dubbo")
    public interface RegistryFactory {
        @Adaptive({"protocol"})
        Registry getRegistry(URL url);
    }
    

    RegistryFactory 就是产生一个注册中心的工程,它有个自适应的方法getRegistry,那么我们知道dubbo会通过javassist动态产生一个RegistryFactory$Adaptive类,并且getRegistry方法的内部实现大致是如下:

    public class RegistryFactory$Adaptive implements RegistryFactory {
        @Override
        public Registry getRegistry(URL url) {
            if (url == null) throw new IllegalArgumentException("url == null");
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) " +
                        "name from url (" + url.toString() + ") use keys([protocol])");
            RegistryFactory extension = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(extName);
            return extension.getRegistry(url);
        }
    }
    

    它通过传入的URL的protocol协议字段排判断是什么类型注册中心。例如,url的protocol的协议是zookeeper,那么就会根据SPI的ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper")得到一个产生ZooKeeper注册中心的工厂,也就是ZookeeperRegistryFactory,而ZookeeperRegistryFactory这个类的getRegistry就是返回一个Zookeeper注册中心。

    2、Registry 接口是所有注册中心的抽象。它的类关系图如下:

    可以看出其语义,一个注册中心Registry是一个节点(extends Node),并且它具有注册服务(extends RegistryService)的功能。

    dubbo支持如下这些注册中心zookeeper、consul、etcd3、eureka、nacas、redis、sofa,那么就会产生相应如下的Registry:ZookeeperRegistry、ConsulRegistry、EtcdRegistry、NacosRegistry、RedisRegistry、SofaRegistry。类图如下:

    所以我们知道,这些注册中心都是继承FailbackRegistry,这个FailbackRegistry其意思就是说,如果一个服务注册到当前某个注册中心注册失败后,可会在后台产生一个daemon线程,定时的把注册失败服务重新注册,并且有一定的重试限制。

    在上面的类图中我们并没有发现有个名为EurekaRegistry这样的类,因为实现了另一个接口ServiceDiscovery方式,类名为EurekaServiceDiscovery来进行服务发现。

    3、RegistryProtocol

    dubbo的协议是通过名为org.apache.dubbo.rpc.Protocol来进行抽象的,那么注册协议也是一样的,是通过org.apache.dubbo.registry.integration.RegistryProtocol来表达的,继承org.apache.dubbo.rpc.Protocol。RegistryPrtocol是扩展点Protocol的具体实现,会一次调用其setter方法来注入其需要的属性,RegistryPrtocol其中有个属性就是RegistryFactory,那么就要为它注入一个具体的RegistryFactory,那么这个具体的RegistryFactory工厂就是上面的RegistryFactory$Adaptive。因为注入的属性对象会从SpringExtensionFactory和SpiExtensionFactory工厂中查询,刚好RegistryFactory也是一个扩展点,所以会在SpiExtensionFactory找出,并且SpiExtensionFactory工厂的实现如下:

    public class SpiExtensionFactory implements ExtensionFactory {
    
        @Override
        public <T> T getExtension(Class<T> type, String name) {
            if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
                ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
                if (!loader.getSupportedExtensions().isEmpty()) {
                    return loader.getAdaptiveExtension();
                }
            }
            return null;
        }
    
    }
    

    所以知道是返回一个自适应的扩展点,即RegistryFactory$Adaptive。
    Protocol协议具有导出服务export()功能,和引用服务refer()功能。在RegistryProtocol中,在这个2个方法内就有对服务注册到注册中心的操作。

    4、服务导出

    在服务导出中,首先要有一个认知,做dubbo服务暴露的时候,我们有2中方式,一种是通过注解的方式:
    @DubboService、@Service(非spring的)。或者通过xml的方式<dubbo:service />。
    不管采用哪一种方式,最终需要暴露的服务首先会包装成一个ServiceBean的对象。这个ServiceBean 持有具体需要服务注册的对象ref。ServiceBean的类图如下:

    服务导出也是是一个繁琐的过程,本文只讨论其服务导出引入与注册中心交互。

    5、dubbo启动引导其服务导出。

    DubboBootstrap是一个dubbo框架启动的帮助类,他有一个start()方法,在该方法的内部就会调用exportServices()用于导出服务,和调用referServices()进行引用服务。

    一般使用dubbo框架的都会引入Spring框架,Spring框架有一个事件监听机制,dubbo正是监听Spring的上下文刷新事件ContextRefreshedEvent,来启动Dubbo服务的。这个服务监听类就是DubboBootstrapApplicationListener。

    DubboBootstrapApplicationListener是如何注册到Spring中的呢?

    • 1、如果是通过注解@DubboService,就是通过ServiceClassPostProcessor类,该类是实现了Spring的BeanDefinitionRegistryPostProcessor。所以通过registerBeans进行注册。在@EnableDubbo注解上有一个@DubboComponentScan注解,该注解上的@export注解就会导入DubboComponentScanRegistrar类,在该类中完成DubboBootstrapApplicationListener的注册。

    • 2、如果是通过<dubbo:service />的方式,我们知道Spring对于自定义的标签,需要自已提供一个NamespaceHanlder的实现类来协助解析自定义标签。而dubbo的NamespaceHanlder实现类为DubboNamespaceHandler。DubboNamespaceHandler该类就有该监听器的注入。
      并且classpath下的META-INF下新增spring.hanlders和spring.schemes。内容如下:
      spring.shemes :
      http://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
      说明dubbo的命名空间文件位置
      spring.handler:
      http://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
      说明处理该命名空间下的自定义标签通过DubboNamespaceHandler。

    源码跟踪

    registry方法定位到FailbackRegistry,主要作用当服务注册失败后,可以在后端线程重试。

    public abstract class FailbackRegistry extends AbstractRegistry {
    
        @Override
        public void register(URL url) {
            // 判断该注册中心能接受的协议
            if (!acceptable(url)) {
                logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
                return;
            }
            // 调用AbstractRegistry的register(),主要是吧注册的URL放入registered集合中,说明该URL已经要被注册
            super.register(url);
            // 当前URL需要被注册,所以把它从注册失败列表里移除,因为可能是重试注册。
            removeFailedRegistered(url);
            // 当前URL需要被注册,所以把它从注销失败列表里移除,因为可能是重试注册。
            removeFailedUnregistered(url);
            try {
                //调用子类的具体doRegister,模板方法
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                // 查看是否check字段是否设置为true.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                //如果需要严格检测的话,直接抛异常
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // 否则把注册失败的URL 添加到failedRegistered,注册失败列表
                addFailedRegistered(url);
            }
        }
    }
    
    public abstract class FailbackRegistry extends AbstractRegistry {
        
        private void addFailedRegistered(URL url) {
            //获取该注册URL是否已经存在在注册失败列表里,存在直接返回
            FailedRegisteredTask oldOne = failedRegistered.get(url);
            if (oldOne != null) {
                return;
            }
            // 否则创建一个失败注册重试任务FailedRegisteredTask,放入failedRegistered中。
            FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
            oldOne = failedRegistered.putIfAbsent(url, newTask);
            if (oldOne == null) {
                // 然后把该失败注册任务放入daemon线程retryTimer,定式重新注册
                retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
            }
        }
    }
    

    接下来分析AbstractRegistry 的作用和FailbackRegistry的重试机制,并且详细剖析ZookeeperRegistry。

    AbstractRegistry

    首先,直接引出这个类的作用,该类主要把服务提供者信息缓存本地文件上,文件目录是:当前用户目录下的/.dubbo/dubbo-registry-{application}-{hos}-${port}.cache。
    在解读源码前,先阅读下AbstractRegistry类的成员变量,从成员变量中可以看到这个类是怎么完成数据的本地化存储的。

    public abstract class AbstractRegistry implements Registry {
    
        // URL 地址分隔符
        private static final char URL_SEPARATOR = ' ';
        
        //URL地址正则表达式,任何空白符
        private static final String URL_SPLIT = "\\s+";
        
        // 参数保存到本地文件的最大重试次数
        private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3;
        
        // 需要保存的参数
        private final Properties properties = new Properties();
        
        // 保存线程,可以看出是否异步保存
        private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
        
        // 是否同步保存
        private boolean syncSaveFile;
        
        // 上一次保存的版本,每次保存更新+1
        private final AtomicLong lastCacheChanged = new AtomicLong();
        
        // 保存重试的次数
        private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
        
        // 服务注册的URL保存在这里
        private final Set<URL> registered = new ConcurrentHashSet<>();
        
        // 订阅的URL,key:消费端订阅者URL,values: 通知监听器
        private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
        
        // 订阅的URL,key:消费端订阅者URL,values: Map ,key:服务提供者的名字(默认为providers,configurations,routers),和服务提供者URL
        private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
        
        //当前注册URL,于指定的注册中心连接的URL
        private URL registryUrl;
        
        //本地文件
        private File file;
    }
    

    入口,构造函数

    public abstract class AbstractRegistry implements Registry {
    
        public AbstractRegistry(URL url) {
            // 保存与注册中心连接的url.
            setUrl(url);
            
            //判断是否需要缓存本地文件,默认需要,文件地址
            if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
                // Start file save timer
                // 是否同步保存,默认是异步
                syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
                
                //文件名和路径一般在,当前用户目录下的/.dubbo/dubbo-registry-${application}-${hos}-${port}.cache
                //例如dubbo-registry-dubbo-demo-annotation-provider-106.52.187.48-2181.cache
                String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
                String filename = url.getParameter(FILE_KEY, defaultFilename);
                File file = null;
                
                //创建文件
                if (ConfigUtils.isNotEmpty(filename)) {
                    file = new File(filename);
                    if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                        if (!file.getParentFile().mkdirs()) {
                            throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                        }
                    }
                }
                this.file = file;
                //在启动订阅中心时,我们需要读取本地缓存文件,以便将来进行注册表容错处理。 
                //其实就是把本地文件file的内容 放入参数properties里
                loadProperties();
                // 进行通知url.getBackupUrls(),第一个参数就是url 自己本身
                notify(url.getBackupUrls());
            }
        }
    }
    

    上面的注释已经非常的清晰了,这里就不在描述,需要关注的是notify()这个函数,所以当每个服务注册和订阅时,首次创建注册中心都会进行notify操作。具体来看下notify方法。

    public abstract class AbstractRegistry implements Registry {
    
        protected void notify(List<URL> urls) {
            // 这里是注册中心链接的url,里面包括了服务提供方的信息(key:interface等)
            if (CollectionUtils.isEmpty(urls)) {
                return;
            }
            // 这里循环所有的订阅URL
            for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
                URL url = entry.getKey();
                // 查看订阅的url 是否是订阅当前的注册服务。不是的话,轮训下一个
                if (!UrlUtils.isMatch(url, urls.get(0))) {
                    continue;
                }
                // 这里订阅的URL的通知监听器
                Set<NotifyListener> listeners = entry.getValue();
                if (listeners != null) {
                
                    // 然后进行依次遍历通知
                    for (NotifyListener listener : listeners) {
                        try {
                            notify(url, listener, filterEmpty(url, urls));
                        } catch (Throwable t) {
                            logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }
    
    • 接下来看下具体的notify(URL url, NotifyListener listener, List urls)
    public abstract class AbstractRegistry implements Registry {
    
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                    && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            // keep every provider's category.
            Map<String, List<URL>> result = new HashMap<>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {// 这里再一次判断,订阅URL 和服务提供者URL 是否匹配
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                
                //监听通知
                listener.notify(categoryList);
                // 我们将在每次通知后更新缓存文件。
                // 当我们的注册表由于网络抖动而出现订阅失败时,我们至少可以返回现有的缓存URL。
                saveProperties(url);
            }
        }
    }
    
    • 接着看下saveProperties
    public abstract class AbstractRegistry implements Registry {
    
        private void saveProperties(URL url) {
            if (file == null) {
                return;
            }
    
            try {
                StringBuilder buf = new StringBuilder();
                // 得到该订阅URL 的所有服务提供者URLS,并放入buf中
                Map<String, List<URL>> categoryNotified = notified.get(url);
                if (categoryNotified != null) {
                    for (List<URL> us : categoryNotified.values()) {
                        for (URL u : us) {
                            if (buf.length() > 0) {
                                buf.append(URL_SEPARATOR);
                            }
                            buf.append(u.toFullString());
                        }
                    }
                }
                // key 服务接口,value :提供者URL.
                properties.setProperty(url.getServiceKey(), buf.toString());
                // 新增一个版本
                long version = lastCacheChanged.incrementAndGet();
                if (syncSaveFile) {
                    //同步保存
                    doSaveProperties(version);
                } else {
                    // 异步保存
                    registryCacheExecutor.execute(new SaveProperties(version));
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    

    从上面可以知道,把消费端的订阅的服务信息存入了file文件中,doSaveProperties就是文件操作,不进行分析。再一次强调下,消费端订阅时,会订阅某个具体服务下3个节点(providers,configurations,routers)。

    FailbackRegistry

    接着,FailbackRegistry继承自AbstractRegistry。
    其构造函数如下,可以得知除了调用AbstractRegistry构造方法外,并且创建一个HashedWheelTimer类型的定时器。

    public abstract class FailbackRegistry extends AbstractRegistry {
    
        /**
         * The time in milliseconds the retryExecutor will wait
         */
        private final int retryPeriod;
    
        // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
        private final HashedWheelTimer retryTimer;
    
        public FailbackRegistry(URL url) {
            super(url);
            this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
    
            //集运时间轮转的重试线程器
            retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
        }
    }
    

    并且FailbackRegistry 成员记录一组注册失败和订阅失败的集合,然后通过retryTimer定式扫描这些失败集合,重新发起订阅和注册。

    下面是失败集合:

    public abstract class FailbackRegistry extends AbstractRegistry {
    
        // 这里是注册失败的urls
        private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
    
        // 这里是取消注册失败的urls
        private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
    
        // 这里是订阅失败的urls
        private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
    
        // 这里是取消订阅失败的urls
        private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
    
    }
    

    参考:
    https://www.cnblogs.com/liferecord/p/13462175.html

    https://www.cnblogs.com/liferecord/p/13497411.html

    https://www.cnblogs.com/Cubemen/p/12294377.html

    https://blog.csdn.net/cold___play/article/details/107007130

    https://www.jianshu.com/p/75931e545b36

    相关文章

      网友评论

        本文标题:Dubbo——Registry服务注册

        本文链接:https://www.haomeiwen.com/subject/kwghlltx.html