美文网首页
Dubbo——深入解读服务注册到注册中心

Dubbo——深入解读服务注册到注册中心

作者: 小波同学 | 来源:发表于2021-04-15 02:23 被阅读0次

    一、前言

    前面有说到Dubbo的服务发现机制,也就是SPI,那既然Dubbo内部实现了更加强大的服务发现机制,现在我们就来一起看看Dubbo在发现服务后需要做什么才能将服务注册到注册中心中。

    二、Dubbo服务注册简介

    首先需要明白的是Dubbo是依赖于Spring容器的,Dubbo服务注册过程也是始于Spring容器发布刷新事件。而后Dubbo在接收到事件后,就会进行服务注册,整个逻辑大致分为三个部分:

    • 1、检查参数,组装URL:服务消费方是通过URL拿到服务提供者的,所以我们需要为服务提供者配置好对应的URL。

    • 2、导出服务到本地和远程:这里的本地指的是JVM,远程指的是实现invoke,使得服务消费方能够通过invoke调用到服务。

    • 3、向注册中心注册服务:能够让服务消费方知道服务提供方提供了那个服务。

    三、接收Spring容器刷新事件

    Dubbo服务注册是始于Spring容器发布刷新事件。

    DubboBootstrap的启动是由start方法完成的,start方法的执行成功,意味着dubbo的启动结束。服务端服务的暴露就是在start方法中完成的。

    一般使用dubbo框架的都会引入Spring框架,Spring框架有一个事件监听机制,dubbo正是监听Spring的上下文刷新事件ContextRefreshedEvent,来启动Dubbo服务的。这个服务监听类就是DubboBootstrapApplicationListener。

    public class DubboBootstrapApplicationListener extends OnceApplicationContextEventListener implements Ordered {
    
        private final DubboBootstrap dubboBootstrap;
        
        public DubboBootstrapApplicationListener(ApplicationContext applicationContext) {
            super(applicationContext);
            this.dubboBootstrap = DubboBootstrap.getInstance();
            DubboBootstrapStartStopListenerSpringAdapter.applicationContext = applicationContext;
        }
    
        @Override
        public void onApplicationContextEvent(ApplicationContextEvent event) {
            if (event instanceof ContextRefreshedEvent) {
                onContextRefreshedEvent((ContextRefreshedEvent) event);
            } else if (event instanceof ContextClosedEvent) {
                onContextClosedEvent((ContextClosedEvent) event);
            }
        }
    
        private void onContextRefreshedEvent(ContextRefreshedEvent event) {
            dubboBootstrap.start();
        }
    }
    

    从上面代码可以看出,当发生ContextRefreshedEvent事件的时候,dubbo调用DubboBootstrap的start方法。

    四、start方法详解

    DubboBootstrap是一个dubbo框架启动的帮助类,他有一个start()方法,在该方法的内部就会调用exportServices()用于导出服务,和调用referServices()进行引用服务。

    public class DubboBootstrap extends GenericEventListener {
    
        public DubboBootstrap start() {
            //只能初始化一次
            if (started.compareAndSet(false, true)) {
                ready.set(false);
                //调用初始化方法
                initialize();
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is starting...");
                }
                // 1. export Dubbo Services
                exportServices();
    
                // Not only provider register
                if (!isOnlyRegisterProvider() || hasExportedServices()) {
                    // 2. export MetadataService
                    exportMetadataService();
                    //3. Register the local ServiceInstance if required
                    registerServiceInstance();
                }
    
                referServices();
                if (asyncExportingFutures.size() > 0) {
                    new Thread(() -> {
                        try {
                            this.awaitFinish();
                        } catch (Exception e) {
                            logger.warn(NAME + " exportAsync occurred an exception.");
                        }
                        ready.set(true);
                        if (logger.isInfoEnabled()) {
                            logger.info(NAME + " is ready.");
                        }
                        ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class);
                        exts.getSupportedExtensionInstances().forEach(ext -> ext.onStart(this));
                    }).start();
                } else {
                    ready.set(true);
                    if (logger.isInfoEnabled()) {
                        logger.info(NAME + " is ready.");
                    }
                    ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class);
                    exts.getSupportedExtensionInstances().forEach(ext -> ext.onStart(this));
                }
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " has started.");
                }
            }
            return this;
        }
    }
    

    1、initialize方法

    在start方法中,也会调用initialize方法,之前提到过,服务端启动的时候,dubbo在start方法中调用initialize方法做初始化,而客户端启动的时候会跳过initialize方法。

    2、exportServices方法

    initialize方法之后调用exportServices方法,该方法用于暴露服务,服务端使用。

    public class DubboBootstrap extends GenericEventListener {
    
        private final ConfigManager configManager;
    
        private void exportServices() {
            //创建的每个ServiceConfig对象都添加到configManager,下面获取所有的ServiceConfig对象并遍历
            configManager.getServices().forEach(sc -> {
                // TODO, compatible with ServiceConfig.export()
                ServiceConfig serviceConfig = (ServiceConfig) sc;
                serviceConfig.setBootstrap(this);
    
                if (exportAsync) {
                    //异步暴露,使用线程池暴露服务
                    ExecutorService executor = executorRepository.getServiceExporterExecutor();
                    Future<?> future = executor.submit(() -> {
                        sc.export();    //暴露服务
                        exportedServices.add(sc);   //记录所有暴露的服务
                    });
                    asyncExportingFutures.add(future);
                } else {
                    sc.export();//暴露服务
                    exportedServices.add(sc);   //记录所有暴露的服务
                }
            });
        }
    }
    

    3、isOnlyRegisterProvider方法

    isOnlyRegisterProvider方法检查ApplicationConfig的registerConsumer属性,从代码分析,registerConsumer只是用于在这里做了判断,我认为该属性无用,可以不用配置。

    4、hasExportedServices方法

    hasExportedServices()检查是否配置元数据中心的url,如果配置了,返回true。

    5、exportMetadataService方法

    exportMetadataService方法用于暴露本地元数据服务,initMetadataServiceExporter方法里面建立了对象metadataServiceExporter,然后在这里暴露服务。服务暴露后客户端可以使用MetadataService接口方法查看dubbo实例的元数据,服务是以dubbo服务形式发布的。

    public class DubboBootstrap extends GenericEventListener {
    
        private volatile MetadataServiceExporter metadataServiceExporter;
    
        private void exportMetadataService() {
            metadataServiceExporter.export();
        }
    }
    

    6、registerServiceInstance方法

    registerServiceInstance用于将dubbo实例注册到专用于服务发现的注册中心。

    public class DubboBootstrap extends GenericEventListener {
    
        private void registerServiceInstance() {
            if (CollectionUtils.isEmpty(getServiceDiscoveries())) {
                return;
            }
    
            ApplicationConfig application = getApplication();
    
            String serviceName = application.getName();
    
            URL exportedURL = selectMetadataServiceExportedURL();
    
            String host = exportedURL.getHost();
    
            int port = exportedURL.getPort();
    
            //创建ServiceInstance对象,该对象中持有下面方法的入参值,以及注册中心的类型,默认是local。
            //ServiceInstance代表了一个dubbo实例,其他dubbo实例可以通过ServiceInstance的属性信息找到本实例的位置
            ServiceInstance serviceInstance = createServiceInstance(serviceName, host, port);
    
            //获取所有的注册中心,将ServiceInstance对象注册到注册中心,
            //这里的注册中心实现了ServiceDiscovery接口,是专用于服务发现的。
            doRegisterServiceInstance(serviceInstance);
    
            // scheduled task for updating Metadata and ServiceInstance
            executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
                InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension();
                localMetadataService.blockUntilUpdated();
                ServiceInstanceMetadataUtils.refreshMetadataAndInstance();
            }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MICROSECONDS);
        }
        
        private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
            //FIXME
            publishMetadataToRemote(serviceInstance);
    
            getServiceDiscoveries().forEach(serviceDiscovery ->
            {
                calInstanceRevision(serviceDiscovery, serviceInstance);
                // register metadata
                serviceDiscovery.register(serviceInstance);
            });
        }   
    }
    

    7、referServices方法

    referServices方法用于处理ReferenceConfig对象。

    public class DubboBootstrap extends GenericEventListener {
    
        private void referServices() {
            if (cache == null) {
                cache = ReferenceConfigCache.getCache();
            }
    
            configManager.getReferences().forEach(rc -> {
                // TODO, compatible with  ReferenceConfig.refer()
                ReferenceConfig referenceConfig = (ReferenceConfig) rc;
                referenceConfig.setBootstrap(this);
    
                if (rc.shouldInit()) {
                    if (referAsync) {
                        CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
                                executorRepository.getServiceExporterExecutor(),
                                () -> cache.get(rc)
                        );
                        asyncReferringFutures.add(future);
                    } else {
                        cache.get(rc);
                    }
                }
            });
        }
    }
    

    在我们平常编写provider的接口实现类时,都会打上@Service注解,从而这个标注这个类属于ServiceBean。

    而DubboBootstrap中的start()方法最终会调用到ServiceBean类中的exported()方法。

    这就是Dubbo服务导出到注册中心过程的起点。需要我们在服务接口实现类上打上@Service。ServiceBean是Dubbo与Spring 框架进行整合的关键,可以看做是两个框架之间的桥梁。具有同样作用的类还有ReferenceBean。

    五、检查配置参数以及URL装配

    1、检查配置

    在这一阶段Dubbo需要检查用户的配置是否合理,或者为用户补充缺省配置。就是从刷新事件开始,进入export()方法,源码解析如下:

    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        public synchronized void export() {
            if (bootstrap == null) {
                bootstrap = DubboBootstrap.getInstance();
                bootstrap.initialize();
            }
    
            //检查并且更新配置
            checkAndUpdateSubConfigs();
    
            //init serviceMetadata
            serviceMetadata.setVersion(getVersion());
            serviceMetadata.setGroup(getGroup());
            serviceMetadata.setDefaultGroup(getGroup());
            serviceMetadata.setServiceType(getInterfaceClass());
            serviceMetadata.setServiceInterfaceName(getInterface());
            serviceMetadata.setTarget(getRef());
    
            //是否需要导出
            if (!shouldExport()) {
                return;
            }
    
            //是否需要延时
            if (shouldDelay()) {
                DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                //立刻导出
                doExport();
            }
            //导出后
            exported();
        }
    }
    
    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        private void checkAndUpdateSubConfigs() {
            // Use default configs defined explicitly with global scope
            //检查配置项包括provider是否存在,导出端口是否可用,注册中心是否可以连接等等
            // 使用全局范围显式定义的默认配置
            // <dubbo:provider/> <dubbo:protocol/>
            completeCompoundConfigs();
            checkDefault();
            checkProtocol();
            // init some null configuration.
            // 初始化一些空设置
            List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
                    .getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
            configInitializers.forEach(e -> e.initServiceConfig(this));
    
            // if protocol is not injvm checkRegistry
            if (!isOnlyInJvm()) {
                //如果protocol不是内部调用(injvm)
                //检查registry设置
                checkRegistry();
            }
            
            //检查接口内部方法是否不为空
            //environment中的值覆盖service属性
            this.refresh();
    
            if (StringUtils.isEmpty(interfaceName)) {
                //interface属性必需
                throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
            }
    
            if (ref instanceof GenericService) {
                //如果实际Bean实现了GenericService接口,常用服务bean
                interfaceClass = GenericService.class;
                if (StringUtils.isEmpty(generic)) {
                    generic = Boolean.TRUE.toString();
                }
            } else {
                try {
                    //不是GenericService接口类型,根据interfaceClass = Class.forName(interfaceName)
                    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                            .getContextClassLoader());
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                //检查接口方法都实现了
                checkInterfaceAndMethods(interfaceClass, getMethods());
                checkRef();
                generic = Boolean.FALSE.toString();
            }
            if (local != null) {
                //配置<dubbo:service local="true"/>时
                if ("true".equals(local)) {
                    local = interfaceName + "Local";
                }
                Class<?> localClass;
                try {
                    //初始化代理类localClass = ClassforName(interfaceName+"Local")
                    localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                if (!interfaceClass.isAssignableFrom(localClass)) {
                    throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
                }
            }
            if (stub != null) {
                //配置<dubbo:service local="true"/>时
                if ("true".equals(stub)) {
                    stub = interfaceName + "Stub";
                }
                Class<?> stubClass;
                try {
                    //初始化代理类stubClass = ClassforName(interfaceName+"Local")
                    stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
                } catch (ClassNotFoundException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
                if (!interfaceClass.isAssignableFrom(stubClass)) {
                    throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
                }
            }
            //检查若配置了local或stub,localClass或stubClass必须是interfaceClass的子类
            checkStubAndLocal(interfaceClass);
            ConfigValidationUtils.checkMock(interfaceClass, this);
            //检查serviceconfig的属性
            ConfigValidationUtils.validateServiceConfig(this);
            //配置后置处理器ConfigPostProcess,类似于BeanPostProcessor
            postProcessConfig();
        }
    }
    

    上面的源码分析可看出。export方法主要检查的配置项有@Service标签的类是否属性合法。服务提供者是否存在,是否有对应的Application启动,端口是否能连接,是否有对应的注册中心等等一些配置,在检查完这些配置后Dubbo会识别我们此次启动服务是想在本地启动进行一些调试,还是将服务暴露给别人。不想暴露出去可以进行配置。

    <dubbo:provider export="false" />
    

    2、URL装配

    在Dubbo中的URL一般包括以下字段:protocol、host、port、path、parameters。在检查配置后会进入到doExport中。

    • protocol:就是URL最前面的字段,表示的是协议,一般是:dubbo thrift http zk
    • host.port:就是对应的IP地址和端口
    • path:接口名称
    • parameters:参数键值对
    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        protected synchronized void doExport() {
            if (unexported) {
                throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
            }
            //已注册的直接返回
            if (exported) {
                return;
            }
            exported = true;
    
            if (StringUtils.isEmpty(path)) {
                //设置path默认值为接口名
                path = interfaceName;
            }
            doExportUrls();
        }
    }
    

    进入到doExportUrls

    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        private void doExportUrls() {
            //前面创建服务仓库ServiceRepository
            ServiceRepository repository = ApplicationModel.getServiceRepository();
            //将Service注册到服务仓库ServiceRepository的services中
            ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
            //将Service注册到服务仓库ServiceRepository的providers中
            repository.registerProvider(
                    getUniqueServiceName(),
                    ref,
                    serviceDescriptor,
                    this,
                    serviceMetadata
            );
    
            //生成registryURLs格式如下
            // registry://mcip:2291/org.apache.dubbo.registry.RegistryService?application=dubbo-demo&backup=mcip:2292,mcip:2293&dubbo=2.0.2&pid=13252&registry=zookeeper&release=2.7.5&timestamp=1584191484814
            List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
    
            //<dubbo:protocol name="dubbo" port="${dubbo.port}" />
            for (ProtocolConfig protocolConfig : protocols) {
                //服务路径=protocol的contexpath属性+service的path+group+version
                String pathKey = URL.buildKey(getContextPath(protocolConfig)
                        .map(p -> p + "/" + path)
                        .orElse(path), group, version);
                // In case user specified path, register service one more time to map it to path.
                // 如果指定了特殊路径,再次注入到服务仓库ServiceRepository的services中,
                // 未指定同名覆盖
                repository.registerService(pathKey, interfaceClass);
                // TODO, uncomment this line once service key is unified
                //设置到serviceMetadata中
                serviceMetadata.setServiceKey(pathKey);
                //注册
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    }
    

    进入到加载注册中心链接的方法

    public class ConfigValidationUtils {
    
        public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
            // check && override if necessary
            List<URL> registryList = new ArrayList<URL>();
            ApplicationConfig application = interfaceConfig.getApplication();
            List<RegistryConfig> registries = interfaceConfig.getRegistries();
            if (CollectionUtils.isNotEmpty(registries)) {
                for (RegistryConfig config : registries) {
                    String address = config.getAddress();
                    if (StringUtils.isEmpty(address)) {
                        address = ANYHOST_VALUE;
                    }
                    // 判断当前注册中心的地址是否为可用地址,不可用地址格式为 N/A
                    if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                        Map<String, String> map = new HashMap<String, String>();
                        // 添加 ApplicationConfig 中的字段信息到 map 中
                        AbstractConfig.appendParameters(map, application);
                        // 添加 RegistryConfig 字段信息到 map 中
                        AbstractConfig.appendParameters(map, config);
                        // 添加 path,protocol 等信息到 map 中
                        map.put(PATH_KEY, RegistryService.class.getName());
                        AbstractInterfaceConfig.appendRuntimeParameters(map);
                         // 如果注册中心配置中没有配置protocol,则默认用dubbo
                        if (!map.containsKey(PROTOCOL_KEY)) {
                            map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                        }
                        // 解析得到 URL 列表,address 可能包含多个注册中心 ip,
                        // 因此解析得到的是一个 URL 列表
                        List<URL> urls = UrlUtils.parseURLs(address, map);
                        // 重新构建URL
                        for (URL url : urls) {
                            // 将 URL 协议头设置为 registry
                            url = URLBuilder.from(url)
                                    .addParameter(REGISTRY_KEY, url.getProtocol())
                                    .setProtocol(extractRegistryType(url))
                                    .build();
                            // 满足两个条件会往里添加:1、是提供者且有配置注册中心;2、不是提供者但是配置了订阅    
                            // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
                            // (服务提供者 && register = true 或 null) || (非服务提供者 && subscribe = true 或 null)
                            if ((provider && url.getParameter(REGISTER_KEY, true))
                                    || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                                    
                                //添加url到registryList中
                                registryList.add(url);
                            }
                        }
                    }
                }
            }
            return genCompatibleRegistries(registryList, provider);
        }
    }
    

    loadRegistries方法主要包含如下的逻辑:

    • 1、构建参数映射集合,也就是 map
    • 2、构建注册中心链接列表
    • 3、遍历链接列表,并根据条件决定是否将其添加到 registryList 中

    实际上因为Dubbo现如今支持很多注册中心,所以对于一些注册中心的URL也要进行遍历构建。这里是生成注册中心的URL。还未生成Dubbo服务的URL。比如说使用的是Zookeeper注册中心,可能从loadRegistries中拿到的就是:

    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.7.3&pid=1528&qos.port=22222&registry=zookeeper&timestamp=1530743640901
    

    这种类型的URL,表示这是一个注册协议,现在可以根据这个URL定位到注册中心去了。服务接口是RegistryService,registry的类型为zookeeper。可是我们还未生成Dubbo服务提供方的URL所以接着看下面代码

    然后进行到doExportUrlsFor1Protocol(装配Dubbo服务的URL并且实行发布)

    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            //首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中
            //map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。
            //最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
            String name = protocolConfig.getName();
            // 如果协议名为空,或空串,则将协议名变量设置为 dubbo
            if (StringUtils.isEmpty(name)) {
                //<dubbo:protocol name=""/>默认为dubbo
                name = DUBBO;
            }
    
            Map<String, String> map = new HashMap<String, String>();
            // 添加 side、版本、时间戳以及进程号等信息到 map 中
            map.put(SIDE_KEY, PROVIDER_SIDE);
            
            ServiceConfig.appendRuntimeParameters(map);
            // 通过反射将对象的字段信息添加到 map 中
            AbstractConfig.appendParameters(map, getMetrics());
            AbstractConfig.appendParameters(map, getApplication());
            AbstractConfig.appendParameters(map, getModule());
            // remove 'default.' prefix for configs from ProviderConfig
            // appendParameters(map, provider, Constants.DEFAULT_KEY);
            AbstractConfig.appendParameters(map, provider);
            AbstractConfig.appendParameters(map, protocolConfig);
            AbstractConfig.appendParameters(map, this);
            MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
            if (metadataReportConfig != null && metadataReportConfig.isValid()) {
                map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
            }
            // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
            if (CollectionUtils.isNotEmpty(getMethods())) {
                //检测 <dubbo:method> 标签中的配置信息,并将相关配置添加到 map 中
                for (MethodConfig method : getMethods()) {
                    AbstractConfig.appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    List<ArgumentConfig> arguments = method.getArguments();
                    if (CollectionUtils.isNotEmpty(arguments)) {
                        for (ArgumentConfig argument : arguments) {
                            // convert argument type
                            if (argument.getType() != null && argument.getType().length() > 0) {
                                Method[] methods = interfaceClass.getMethods();
                                // visit all methods
                                if (methods.length > 0) {
                                    for (int i = 0; i < methods.length; i++) {
                                        String methodName = methods[i].getName();
                                        // target the method, and get its signature
                                        if (methodName.equals(method.getName())) {
                                            Class<?>[] argtypes = methods[i].getParameterTypes();
                                            // one callback in the method
                                            if (argument.getIndex() != -1) {
                                                if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                                } else {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            } else {
                                                // multiple callbacks in the method
                                                for (int j = 0; j < argtypes.length; j++) {
                                                    Class<?> argclazz = argtypes[j];
                                                    if (argclazz.getName().equals(argument.getType())) {
                                                        AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                        if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            } else if (argument.getIndex() != -1) {
                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                            } else {
                                throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                            }
    
                        }
                    }
                } // end of methods for
            }
    
            //genericService设置
            if (ProtocolUtils.isGeneric(generic)) {
                map.put(GENERIC_KEY, generic);
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                //接口版本号,跟随jar包 例如:1.0-SNAPSHOT
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
                //接口的方法名数组
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    //空方法标志map.put("methods","*")
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    //方法数组字符串map.put("methods","insert,update")
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
    
            /**
             * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
             * Service的token验证
             */
            if (ConfigUtils.isEmpty(token) && provider != null) {
                token = provider.getToken();
            }
    
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put(TOKEN_KEY, UUID.randomUUID().toString());
                } else {
                    map.put(TOKEN_KEY, token);
                }
            }
            //init serviceMetadata attachments
            //map放入到serviceMetadata中
            serviceMetadata.getAttachments().putAll(map);
    
            // export service
            // 获取协议host,默认获取本机ip <dubbo:protocol host="" />
            String host = findConfigedHosts(protocolConfig, registryURLs, map);
            //获取协议port,dubbo默认20880 <dubbo:protocol port="" />
            Integer port = findConfigedPorts(protocolConfig, name, map);
            //生成url格式如下
            //dubbo://192.168.56.1:20880/org.study.service.UserService?anyhost=true&application=dubbo-demo&bind.ip=192.168.56.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.study.service.UserService&methods=getUserById,update,insert,transactionalTest,getUserByUserId,delete&pid=13252&release=2.7.5&revision=1.0-SNAPSHOT&side=provider&timestamp=1584192937036
            URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    
            // You can customize Configurator to append extra parameters
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
            //获取作用域scope属性
            String scope = url.getParameter(SCOPE_KEY);
            // don't export when none is configured
            // scope == "none",不注册,一般为null
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    //scope != "remote" ,注册到本地,一般为null会注册到本地
                    exportLocal(url);
                }
                // export to remote if the config is not local (export to local only when config is local)
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    //scope != "local" ,注册到注册表,一般为null会注册到注册表
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
                            //if protocol is only injvm ,not register
                            if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                                //injvm,内部调用不注册
                                continue;
                            }
                            //url存在dynamic保留,不存在赋值为registryURL的dynamic属性值
                            url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                            //监控的url
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                if (url.getParameter(REGISTER_KEY, true)) {
                                    logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                                } else {
                                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                                }
                            }
    
                            // For providers, this is used to enable custom proxy to generate invoker
                            // provider代理属性 一般为null
                            String proxy = url.getParameter(PROXY_KEY);
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                            }
                            //创建一个AbstractProxyInvoker的子类实例new AbstractProxyInvoker(ref, interfaceClass,rul)
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            //包装器
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            //默认DubboProtocol.export注册到远程注册表zookeeper中
                            Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                        
                    // 不存在注册中心,仅导出服务
                    } else {
                        if (logger.isInfoEnabled()) {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
    
                    MetadataUtils.publishServiceDefinition(url);
                }
            }
            this.urls.add(url);
        }
    }
    

    上面的源码前半段是进行URL装配,这个URL就是Dubbo服务的URL,大致如下:

    dubbo://192.168.1.6:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.6&bind.port=20880&dubbo=2.7.3&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider&timestamp=1530746052546
    

    这个URL表示它是一个dubbo协议(DubboProtocol),地址是当前服务器的ip,端口是要暴露的服务的端口号,可以从dubbo:protocol配置,服务接口为dubbo:service配置发布的接口。

    后半段主要是判断scope变量来决定是否将服务导出远程或者本地,导出到本地实际上很简单只需要生成Invoker。当导出到远程就需要添加监视器还要生成invoker。监视器能让Dubbo定时查看注册中心挂了没。会抛出指定异常,而invoker使得服务消费方能够远程调用到服务。并且还会进行注册到注册中心下面我们接着来看看服务的发布。因为Invoker比较重要在消费者和提供者中都有,所以这个后面会单独拿出来进行探讨。

    五、服务发布本地与远程

    1、服务发布到本地

    public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
        private void exportLocal(URL url) {
            //进行本地URL的构建
            URL local = URLBuilder.from(url)
                    .setProtocol(LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            //根据本地的URL来实现对应的Invoker
            Exporter<?> exporter = PROTOCOL.export(
                    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
        }
    }
    
    String LOCALHOST_VALUE = "127.0.0.1";
        
    String LOCAL_PROTOCOL = "injvm";
    

    可见发布到本地是重新构建了protocol,injvm就是代表在本地的JVM里,host与port都统一默认127.0.0.1:0。

    2、服务发布到远程

    public class RegistryProtocol implements Protocol {
    
        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //注册url,zookeeper
            //zookeeper://mcip:2291/org.apache.dubbo.registry.RegistryService?application=dubbo-demo&backup=mcip:2292,mcip:2293&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.study.service.UserService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.study.service.UserService%26methods%3DgetUserById%2Cupdate%2Cinsert%2CtransactionalTest%2CgetUserByUserId%2Cdelete%26pid%3D12956%26release%3D2.7.5%26revision%3D1.0-SNAPSHOT%26side%3Dprovider%26timestamp%3D1584262152345&pid=12956&release=2.7.5&timestamp=1584262152342
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            // 暴露服务的url
            //dubbo://192.168.56.1:20880/org.study.service.UserService?anyhost=true&application=dubbo-demo&bind.ip=192.168.56.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.study.service.UserService&methods=getUserById,update,insert,transactionalTest,getUserByUserId,delete&pid=12956&release=2.7.5&revision=1.0-SNAPSHOT&side=provider&timestamp=1584262152345
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            //获取订阅URL,比如:provider://192.168.1.6:20880/......
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            //url与service绑定,放入容器中,远程调用时根据url找到serviceimpl
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            //向订阅中心推送监听器
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            //导出服务
            //注册invoker到本地dubbo.expo
            //调用DubboProtocol.export,其中openServer开启netty监听,
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            // 最终的注册到zookeeper
            final Registry registry = getRegistry(originInvoker);
            //获取已注册的服务提供者的URL,比如dubbo://192.168.1.6:20880/.......
            final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
            // decide if we need to delay publish
            // 获取 register 参数
            boolean register = providerUrl.getParameter(REGISTER_KEY, true);
            // 根据 register 的值决定是否注册服务
            if (register) {
                register(registryUrl, registeredProviderUrl);
            }
    
            // register stated url on provider model
            registerStatedUrl(registryUrl, registeredProviderUrl, register);
    
    
            exporter.setRegisterUrl(registeredProviderUrl);
            // 向注册中心进行订阅 override 数据
            exporter.setSubscribeUrl(overrideSubscribeUrl);
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
            //确认注册完
            notifyExport(exporter);
            //Ensure that a new exporter instance is returned every time export
            // 创建并返回 DestroyableExporter
            return new DestroyableExporter<>(exporter);
        }
    }
    

    上面的源码主要是根据前面生成的URL进行服务的发布和注册。当执行到doLocalExport也就是发布本地服务到远程时候会调用 DubboProtocol 的 export 方法大致会经历下面一些步骤来导出服务

    • 从Invoker获取providerUrl,构建serviceKey(group/service:version:port),构建DubboExporter并以serviceKey为key放入本地map缓存。

    • 处理url携带的本地存根和callback回调。

    • 根据url打开服务器端口,暴露本地服务。先以url.getAddress为key查询本地缓存serverMap获取ExchangeServer,如果不存在,则通过createServer创建。

    • createServer方法,设置心跳时间,判断url中的传输方式(key=server,对应Transporter服务)是否支持,设置codec=dubbo,最后根据url和ExchangeHandler对象绑定server返回,这里的ExchangeHandler非常重要,它就是消费方调用时,底层通信层回调的Handler,从而获取包含实际Service实现的Invoker执行器,它是定义在DubboProtocol类中的ExchangeHandlerAdapter内部类。

    • 返回DubboExporter对象。

    到这里大致的服务发布图如下:


    六、服务注册

    服务注册操作对于 Dubbo 来说不是必需的,通过服务直连的方式就可以绕过注册中心。但通常我们不会这么做,直连方式不利于服务治理,仅推荐在测试服务时使用。对于 Dubbo 来说,注册中心虽不是必需,但却是必要的。源码如下:

    public class RegistryProtocol implements Protocol {
    
        private void register(URL registryUrl, URL registeredProviderUrl) {
            Registry registry = registryFactory.getRegistry(registryUrl);
            registry.register(registeredProviderUrl);
        }
    }
    
    public abstract class FailbackRegistry extends AbstractRegistry {
    
        @Override
        public void register(URL url) {
            if (!acceptable(url)) {
                logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
                return;
            }
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                // Sending a registration request to the server side
                // 模板方法,由子类实现
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                // 获取 check 参数,若 check = true 将会直接抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                // 记录注册失败的链接
                addFailedRegistered(url);
            }
        }
    }
    
    • 进入doRegister方法,不同的注册中心有相对应的类


    public class ZookeeperRegistry extends FailbackRegistry {
    
        @Override
        public void doRegister(URL url) {
            try {
                // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
                //   /${group}/${serviceInterface}/providers/${url}
                // 比如
                //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • 进入create方法
    public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
    
        @Override
        public void create(String path, boolean ephemeral) {
            if (!ephemeral) {
                if(persistentExistNodePath.contains(path)){
                    return;
                }
                // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
                if (checkExists(path)) {
                    persistentExistNodePath.add(path);
                    return;
                }
            }
            int i = path.lastIndexOf('/');
            if (i > 0) {
                // 递归创建上一级路径
                create(path.substring(0, i), false);
            }
            // 根据 ephemeral 的值创建临时或持久节点
            if (ephemeral) {
                createEphemeral(path);
            } else {
                createPersistent(path);
                persistentExistNodePath.add(path);
            }
        }
    }
    
    • 进入createEphemeral
    public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
    
        @Override
        public void createEphemeral(String path) {
            try {
                // 通过 Curator 框架创建节点
                client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
            } catch (NodeExistsException e) {
                logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                        ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                        " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                        "we can just try to delete and create again.", e);
                deletePath(path);
                createEphemeral(path);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }
    

    根据上面的方法,可以将当前服务对应的配置信息(存储在URL中的)注册到注册中心/dubbo/org.apache.dubbo.demo.DemoService/providers/ 。里面直接使用了Curator进行创建节点(Curator是Netflix公司开源的一套zookeeper客户端框架)。

    七、总结

    到这里Dubbo的服务注册流程终于是解释完。核心在于Dubbo使用规定好的URL+SPI进行寻找和发现服务,通过URL定位注册中心,再通过将服务的URL发布到注册中心从而使得消费者可以知道服务的有哪些,里面可以看见对于URL这种复杂的对象并且需要经常更改的,通常采用建造者模式。而2.7.3版本的Dubbo源码也使用了Java8以后的新特性Lambda表达式来构建隐式函数。

    参考:
    https://www.cnblogs.com/Cubemen/p/12294377.html

    https://blog.csdn.net/weixin_38308374/article/details/105983937

    https://www.cnblogs.com/wqff-biubiu/p/12490393.html

    相关文章

      网友评论

          本文标题:Dubbo——深入解读服务注册到注册中心

          本文链接:https://www.haomeiwen.com/subject/wmvrlltx.html