美文网首页
Dubbo服务暴露

Dubbo服务暴露

作者: DH大黄 | 来源:发表于2021-12-23 19:58 被阅读0次

    服务暴露

    服务暴露的流程可以概括为以下四点

    • 组装URL
    • 创建Invoker代理类,封装真正的实现类
    • 通过URL选择对应的协议暴露(产生Exporter对象)
    • 向注册中心注册提供者信息

    概括成一句话就是:

    在SpringIOC容器刷新完毕后 (export方法),根据配置参数组装成URL,通过 proxyFactory.getInvoker ,利用javassist进行对象代理,封装真正的实现类,然后通过URL参数选择对应的协议进行protocol.export(默认dubbo协议)。在Producer第一次暴露的时候会调用 createServer 来创建 Server(默认NettyServer),然后将 export 得到的 exporter 存入Map(后面处理请求时查找),然后向注册中心注册提供者的信息

    关键类 ServiceConfig

    总体流程图

    Dubbo服务暴露流程.png

    核心源码

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            String name = protocolConfig.getName();
            if (StringUtils.isEmpty(name)) {
                name = DUBBO;
            }
    
            Map<String, String> map = new HashMap<String, String>();
            map.put(SIDE_KEY, PROVIDER_SIDE);
    
            ServiceConfig.appendRuntimeParameters(map);
            AbstractConfig.appendParameters(map, getMetrics());
            AbstractConfig.appendParameters(map, getApplication());
            AbstractConfig.appendParameters(map, getModule());
            // remove 'default.' prefix for configs from ProviderConfig
            // appendParameters(map, provider, Constants.DEFAULT_KEY);
            AbstractConfig.appendParameters(map, provider);
            AbstractConfig.appendParameters(map, protocolConfig);
            AbstractConfig.appendParameters(map, this);
            MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
            if (metadataReportConfig != null && metadataReportConfig.isValid()) {
                map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
            }
            if (CollectionUtils.isNotEmpty(getMethods())) {
                for (MethodConfig method : getMethods()) {
                    AbstractConfig.appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    List<ArgumentConfig> arguments = method.getArguments();
                    if (CollectionUtils.isNotEmpty(arguments)) {
                        for (ArgumentConfig argument : arguments) {
                            // convert argument type
                            if (argument.getType() != null && argument.getType().length() > 0) {
                                Method[] methods = interfaceClass.getMethods();
                                // visit all methods
                                if (methods.length > 0) {
                                    for (int i = 0; i < methods.length; i++) {
                                        String methodName = methods[i].getName();
                                        // target the method, and get its signature
                                        if (methodName.equals(method.getName())) {
                                            Class<?>[] argtypes = methods[i].getParameterTypes();
                                            // one callback in the method
                                            if (argument.getIndex() != -1) {
                                                if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                                } else {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            } else {
                                                // multiple callbacks in the method
                                                for (int j = 0; j < argtypes.length; j++) {
                                                    Class<?> argclazz = argtypes[j];
                                                    if (argclazz.getName().equals(argument.getType())) {
                                                        AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                        if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            } else if (argument.getIndex() != -1) {
                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                            } else {
                                throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                            }
    
                        }
                    }
                } // end of methods for
            }
    
            // 如果为泛化调用,设置泛型类型
            if (ProtocolUtils.isGeneric(generic)) {
                map.put(GENERIC_KEY, generic);
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
    
                // 暴露的方法
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
    
            /**
             * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
             */
            if (ConfigUtils.isEmpty(token) && provider != null) {
                token = provider.getToken();
            }
    
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put(TOKEN_KEY, UUID.randomUUID().toString());
                } else {
                    map.put(TOKEN_KEY, token);
                }
            }
            //init serviceMetadata attachments
            serviceMetadata.getAttachments().putAll(map);
    
            // export service
            String host = findConfigedHosts(protocolConfig, registryURLs, map);
            Integer port = findConfigedPorts(protocolConfig, name, map);
            // 以上操作是往map中填入数据,构建URL
            URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
            // url的内容
            // dubbo://10.167.10.19:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=10.167.10.19&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=43793&release=&side=provider&timestamp=1631080120436
    
            // You can customize Configurator to append extra parameters
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    
            String scope = url.getParameter(SCOPE_KEY);
            // don't export when none is configured
            // 如果scope为SCOPE_NONE不暴露服务
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                // 本地暴露
                if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    // 有一个本地暴露,只所以需要本地暴露是考虑到同一个服务内假如需要调用当前的dubbo接口,则可以直接走jvm内部(injvm),减少网络间的通讯
                    // 修改url协议为injvm
                    exportLocal(url);
                }
                // export to remote if the config is not local (export to local only when config is local)
                // 远程暴露
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    // 注册中心不为空
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        // 遍历注册中心
                        for (URL registryURL : registryURLs) {
                            //if protocol is only injvm ,not register
                            if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                                continue;
                            }
                            url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                if (url.getParameter(REGISTER_KEY, true)) {
                                    logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                                } else {
                                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                                }
                            }
    
                            // For providers, this is used to enable custom proxy to generate invoker
                            String proxy = url.getParameter(PROXY_KEY);
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                            }
    
                            // 生成Invoker对象(动态代理创建 javassist)
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            // 包装
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            // provider 需要将 invoker 封装成 expoter(并在此处进行注册)
                            Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                    } else {
                        // 直连方式
                        if (logger.isInfoEnabled()) {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
    
                    // 元数据存储
                    MetadataUtils.publishServiceDefinition(url);
                }
            }
            this.urls.add(url);
        }
    

    组装URL

    在上面的代码中,前面有一大段都是在进行URL的组装的,这边就不细讲了,具体的可以看下代码的注释

    生成Invoker对象

    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    

    以Javassist为例(默认的是Javassist)

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        // 通过Javaassist封装成Wrapper类(Dubbo服务启动时生成,所以在运行时不会产生开销),减少反射的调用
            // Wrapper.getWrapper -> 反射获取对应的类信息,包装成Wrapper对象
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            // Wrapper最终调用最终调用服务提供者的接口实现类的方法
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    具体的Wrapper内容

    Wrapper内容1.PNG Wrapper内容2.PNG

    暴露操作

    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)
    

    RegistryProtocol

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);
    
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
            // 选择具体的协议去暴露服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
        // url to registry
            // 获取注册中心
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 注册服务
            register(registryUrl, registeredProviderUrl);
        }
    
        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);
    
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
    
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    

    根据具体协议暴露

    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    

    以DubboProtocol为例

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // export service.
        // 创建ServiceKey
        String key = serviceKey(url);
        // 将上层传入的Invoker对象封装成DubboExporter对象,然后记录到exporterMap集合中
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
    
            }
        }
    
        // 启动ProtocolServer
        openServer(url);
        // 进行序列化的优化处理
        optimizeSerialization(url);
    
        return exporter;
    }
    

    以当前的机器的(ip:port)为key,查看缓存中是否存在Server,没有则创建一个

    默认是NettyServer

    private ProtocolServer createServer(URL url) {
      url = URLBuilder.from(url)
              // send readonly event when server closes, it's enabled by default
              // readonlty请求是否阻塞等待
              .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
              // enable heartbeat by default
              // 心跳间隔
              .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
              // Codec2扩展实现
              .addParameter(CODEC_KEY, DubboCodec.NAME)
              .build();
      // 检测SERVER_KEY参数指定的Transporter扩展实现是否合法
      String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    
      if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
          throw new RpcException("Unsupported server type: " + str + ", url: " + url);
      }
    
      ExchangeServer server;
      try {
          // 通过Exchangers门面类,创建ExchangeServer对象
          // 处理consumer的请求
                // handler -> 对应netty的handler
          server = Exchangers.bind(url, requestHandler);
      } catch (RemotingException e) {
          throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
      }
    
      // 检测CLIENT_KEY参数指定的Transporter扩展实现是否合法
      str = url.getParameter(CLIENT_KEY);
      if (str != null && str.length() > 0) {
          Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
          if (!supportedTypes.contains(str)) {
              throw new RpcException("Unsupported client type: " + str);
          }
      }
    
      // 将ExchangeServer封装成DubboProtocolServer返回
      return new DubboProtocolServer(server);
    }
    

    注册到注册中心

    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }
    
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
                    // 创建注册中心实例
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    
    @Override
    public void register(URL url) {
            // 注册服务
        this.register(new com.alibaba.dubbo.common.URL(url));
    }
    

    相关文章

      网友评论

          本文标题:Dubbo服务暴露

          本文链接:https://www.haomeiwen.com/subject/pghkqrtx.html