美文网首页
dubbo的服务发布

dubbo的服务发布

作者: 剑道_7ffc | 来源:发表于2020-06-06 09:21 被阅读0次

    业务功能

    1 配置文件的解析
    2 服务注册:map保存注册的对象,向zookeeper创建结点
    3 启动一个服务端监听
    4 网络通信,序列化和反序列化

    Dubbo 对于 sping 的扩展

    Spring 的标签扩展

    通过spring.handlers来实现自定义配置,以NamespaceUrl作为key,对应的Handler作为value的键值对,解析配置spring的DefaultNamespaceHandlerResolver的resolve方法来处理的。


    image.png

    org.apache.dubbo.config.spring.schema.DubboNamespaceHandler.init

    将对一个的配置解析对应的Config,有3个比较特殊ServiceBean,ReferenceBean和ConfigCenterBean都继承对应的config。

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
    

    org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser.parse

    你解析配置文件,如ServiceBean解析代码如下

    String className = element.getAttribute("class");
    if (className != null && className.length() > 0) {
        RootBeanDefinition classDefinition = new RootBeanDefinition();
        classDefinition.setBeanClass(ReflectUtils.forName(className));
        classDefinition.setLazyInit(false);
        parseProperties(element.getChildNodes(), classDefinition);
        beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
    }
    

    ServiceBean 的实现

    ServiceBean 这个类,分别实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,
    BeanNameAware, ApplicationEventPublisherAware

    org.springframework.beans.factory.InitializingBean#afterPropertiesSet

    在对象初始化执行该方法

    org.springframework.beans.factory.DisposableBean#destroy

    在对象销毁时执行该方法

    org.springframework.context.ApplicationContextAware的setApplicationContext

    可以获取ApplicationContext容器

    org.springframework.context.ApplicationListener#onApplicationEvent

    ApplicationEvent的时间监听

    org.springframework.beans.factory.BeanNameAware#setBeanName

    设置bean的name值

    org.springframework.context.ApplicationEventPublisherAware的setApplicationEventPublisher

    事件发布

    spring事件发送监听

    ApplicationEvent:事件本身
    ApplicationEventPublisherAware:事件发送器
    ApplicationListener:事件监听器

    ServiceBean 中服务暴露过程

    afterPropertiesSet

    将dubbo的配置ServiceBean中,方便后面使用。

    onApplicationEvent

    当容器初始化或刷新会触发

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export(); //导出、发布
        }
    }
    
    export

    服务发布

        public void export() {
            super.export();
            // Publish ServiceBeanExportedEvent
            publishExportEvent();
        }
    

    ServiceConfig 配置类

    export
        public synchronized void export() {
            checkAndUpdateSubConfigs(); //检查或这个更新配置
    
            if (!shouldExport()) { //当前服务是否要发布
                return;
            }
    
            if (shouldDelay()) {//是否延迟
                delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                doExport();
            }
        }
    
    doExportUrls
    private void doExportUrls() {
        //(N)加载注册中心,并且声称URL地址
        //URL(来驱动流程的执行)->[  registry://47.110.245.187:2181/org.apache.dubbo.registry.RegistryService?application=pay-service
        // &dubbo=2.0.2&pid=18104&registry=zookeeper&release=2.7.2&timestamp=1591225813156]
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //iterface , version ,group组成的key
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            //存储服务发布的元数据
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    
    doExportUrlsFor1Protocol

    1 把当前服务下所配置的<dubbo:method>参数进行解析,保存到 map 集合中
    2 获取暴露的ip和端口

    //主机绑定
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    

    3 组装URL对象
    dubbo://169.254.108.117:20880/com.my.dubbo.IPayService

    URL url = new URL (name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    

    4 scope
    分为两种local和remote,Local提供 jvm调用方式即本地的dubbo服务调用;remote表示根据配置中心进行远程发布。

    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
    // 如果是本地发布,则直接调用exportLocal
    if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        exportLocal(url);  //TODO
    }
    // export to remote if the config is not local (export to local only when config is local)
    if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    }
    
    for (URL registryURL : registryURLs) { //registryURL: registry://ip:port...
        //invoker -> 代理类
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
        //MetaData元数据的委托
        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
        Exporter<?> exporter = protocol.export(wrapperInvoker);
        exporters.add(exporter);
    }
    
    protocol.export

    是方法级别的自适应扩展点,会动态生成ProtocolAdaptive,因为url是以registry开头,所以调用的是RegistryProtocol.export。 1 ProtocolAdaptive

    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
        public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            org.apache.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    
        public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
            org.apache.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
    }
    

    RegistryProtocol.export

    暴露一个服务
    // registryUrl -> zookeeper://ip:port
    URL registryUrl = getRegistryUrl(originInvoker);
    // providerUrl -> dubbo:// ip:port
    URL providerUrl = getProviderUrl(originInvoker);
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
    把dubbo:// url注册到zk上
    register(registryUrl, registeredProviderUrl);
    
    doLocalExport

    启动一个netty服务

    protocol.export(invokerDelegate), originInvoker);
    

    DubboProtocol.export

    Wrapper 包装

    会通过 Wrapper 对 Protocol 进行装饰,利用方法增强。
    1 org.apache.dubbo.common.extension.ExtensionLoader#isWrapperClass
    若该类中有某个类型的构造器,则是包装类,否则则不是

    private boolean isWrapperClass(Class<?> clazz) {
        try {
            //type=Protocol.class
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }
    

    2 org.apache.dubbo.common.extension.ExtensionLoader#cacheWrapperClass
    把包装类放入缓存中

    private void cacheWrapperClass(Class<?> clazz) {
        if (cachedWrapperClasses == null) {
            cachedWrapperClasses = new ConcurrentHashSet<>();
        }
        cachedWrapperClasses.add(clazz);
    }
    

    3 org.apache.dubbo.common.extension.ExtensionLoader#createExtension
    从缓存中获取包装类,对实例进行包装

    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
    if (CollectionUtils.isNotEmpty(wrapperClasses)) {
        for (Class<?> wrapperClass : wrapperClasses) {
            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
        }
    }
    

    根据dubbo的配置文件获取,有三个包装类ProtocolFilterWrapper和ProtocolListenerWrapper和QosProtocolWrapper,装饰器分别为: QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol

    ProtocolFilterWrapper包装类

    使用激活扩展点来激活Filter链路。

    protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    return new CallbackRegistrationInvoker<>(last, filters);
    

    org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // export service.
        //获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        openServer(url); //openServer(url) 开启一个服务 ,暴露20880端口
        optimizeSerialization(url);  //优化序列化
    
        return exporter;
    }
    

    org.apache.dubbo.remoting.exchange.Exchangers#bind

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    
        //HeaderExchanger.
        return getExchanger(url).bind(url, handler);
    }
    

    org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger

    .bind

        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    

    org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
    

    org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
    
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
    

    org.apache.dubbo.remoting.transport.netty.NettyServer#doOpen

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
    
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
    

    Invoker是什么

    服务发布分三个阶段:
    第一个阶段会创造一个invoker
    第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
    第三个阶段把dubbo协议的url地址注册到注册中心上.

    org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol

    //获取代理类
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    

    org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getInvoker

    包装类

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }
    

    org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory.getInvoker

    使用自适应扩展点默认的值即@SPI("javassist")

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        /**
         * proxy:接口的实现类的对象
         * type:接口的类型
         * url: registry://ip:port...
         */
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    javassist生成的动态代理代码

    org.apache.dubbo.common.bytecode.Wrapper#makeWrapper:使用字节码动态生成类并初始化对象

    public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
    java.lang.reflect.InvocationTargetException {
        com.my.dubbo.PayServiceImpl w;
        try {
            w = ((com.my.dubbo.PayServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("pay".equals($2) && $3.length == 1) {
                return ($w) w.pay((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
    }
    

    相关文章

      网友评论

          本文标题:dubbo的服务发布

          本文链接:https://www.haomeiwen.com/subject/wwevzhtx.html