美文网首页
dubbo服务发布过程浅析

dubbo服务发布过程浅析

作者: mikewt | 来源:发表于2018-02-01 01:32 被阅读0次

     dubbo服务暴露就是一个远程代理,打开网络监听,接受服务调用请求,将服务接口名,IP,port发布到注册中心的过程。通过《dubbo启动过程分析》可以了解到,在spring容器启动时会将容器中所有的bean初始化成单实例(默认),如果bean继承相应的接口,在实例初始化完成后,会调用实现类中某些接口方法。dubbo的初始化也是通过这样一个过程完成的。
    ServiceConfig.export->doExport()-> doExportUrls()->doExportUrlsFor1Protocol()

    //根据url暴露
     private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            String name = protocolConfig.getName();//获取服务提供者协议名称
            if (name == null || name.length() == 0) {
                name = "dubbo";//默认是dubbo
            }
            //获取服务主机名,为空则自动查找本机IP
            if (NetUtils.isInvalidLocalHost(host)) {
                anyhost = true;
                try {
                    host = InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException e) {
                    logger.warn(e.getMessage(), e);
                }
            -----------------------------------------------
        }
     //SPI加载协议实现类中的默认端口常量 
     final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (port == null || port == 0) {//protocol没有配置port
                port = defaultPort;
            }
            if (port == null || port <= 0) {
                port = getRandomPort(name);//使用本机随机可用端口
                if (port == null || port < 0) {
                    port = NetUtils.getAvailablePort(defaultPort);
                    putRandomPort(name, port);
                }
                logger.warn("Use random available port(" + port + ") for protocol " + name);
            }
           //组织参数
           Map<String, String> map = new HashMap<String, String>();
           ..............................................................
           //形成类似dubbo://的统一URL
          URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
    
            String scope = url.getParameter(Constants.SCOPE_KEY);//获取是scope属性
            //配置为none不暴露
            if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    
                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            //导入监控中心信息
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                             //首先将URL中dubbo替换为registry,创建远程代理Invoker
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                             //此处protocol为spi加载的适配类,会根据invoker中的protocol不同
                             //调用不同的具体实现类,此处在SPI分析中已经说明
                            //Protocol$Adaptor.export()-->dubbofilterwrapper.export()-->dubbolistenerwrapper.export()-->dubboprotocol.export()
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
                    } else {//本地服务
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                }
            }
            this.urls.add(url);
    }
    

    接下来将创建接口实现类的包装类,也就是服务的包装类,调用过程如下:
    proxyfactory$adpative.getInvoker()--> StubProxyFactoryWrapper.getInvoker()->JavassistProxyFactory.getInvoker()
    调用Javaassit创建服务包装类,生成的类如下,重点看invokeMethod()这个方法,后面invoker.invoke()会掉到此方法去执行业务逻辑。

    package com.alibaba.dubbo.common.bytecode;
    
    import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
    import java.lang.reflect.InvocationTargetException;
    import java.util.Map;
    
    public class Wrapper1
      extends Wrapper
      implements ClassGenerator.DC
    {
      ...........................................................
      public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
        throws InvocationTargetException
      {
        DemoServiceImpl localDemoServiceImpl;
        try
        {
          localDemoServiceImpl = (DemoServiceImpl)paramObject;
        }
        catch (Throwable localThrowable1)
        {
          throw new IllegalArgumentException(localThrowable1);
        }
        try
        {
          if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
            return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
          }
        }
        catch (Throwable localThrowable2)
        {
          throw new InvocationTargetException(localThrowable2);
        }
      }
    }
    

    最后返回AbstractProxyInvoker的实现,此实现类是一个非常重要的类,包含了服务实现类,服务接口,url,还有刚才生成的服务包装类的引用。
    然后执行到如下的方法:
    Exporter<?> exporter = protocol.export(invoker);
    此处调用的过程为protocol$Adaptive.export()-->ProtocolFilterWrapper.export()--->ProtocolListenerWrapper.export(),两个wrapper什么都没有做,直接放行,最终调到RegistryProtocol.export()
    参数为刚才封装好的AbstractProxyInvoker实现类,然后:
    ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    下面来分析这个方法,这个方法非常重要,就在此方法中调用DubboProtocol完成服务的发布

    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>
            //此处打开网络监听
    ((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return (ExporterChangeableWrapper<T>) exporter;
        }
    

    在doLocalExport()中,将provider url与AbstractProxyInvoker实现类封装到InvokerDelegete对象中,然后执行protocol.export(invokerDelegete)方法,此处后面会重点分析,此方法返回一个DubboExporter对象,将此对象与AbstractProxyInvoker实现类封装在ExporterChangeableWrapper对象中,并存储在RegistryProtocol这个类的bounds属性,这个属性是一个线程安全的map,以便以后服务调用使用。下面分析调用dubbo协议打开网络监听的过程。依然是SPI机制,经过ProtocolFilterWrapper.export(),完成对Invoker的包装,Invoker中加入了Filter调用链。

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
             //SPI机制加载所有Filter扩展
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
             //加入Filter执行链
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
                    .......................................................
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
                    .......................................................
                    };
                }
            }
            return last;
        }
    

    然后经过ProtocolListenerWrapper.export()调到DubboProtocol.export(),将结果封装成ListenerExporterWrapper返回。

     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
            ...................................................
            openServer(url);
            
            return exporter;
        }
    

    将前面传递过来的invoker,key,exporterMap封装到DubboExporter中,打开server此处是后续调用netty打开网络监听,最终返回到DubboProtocol处的是HeaderExchangeServer,此对象持有nettyserver,在nettyserver构造方法中,有两个构造参数一个是url,另一个是DecodeHandler,此对象里又封装了HeaderExchangeHandler,又封装了ExchangeHandler。初始化了nettyserver的基本参数如:ip,port,timeout等等。具体调用流程如下:
    openServer(url)-->createServer(url)-->Exchangers.bind(url, requestHandler)-->HeaderExchanger.bind()-->NettyTransporter.bind()-->nettyserver.doopen()
    在doopen()方法中,就是对netty的初始化操作,设置线程池,绑定decoder、encoder、handler,然后打开端口,进行监听。

      @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            channel = bootstrap.bind(getBindAddress());
        }
    

    打开网络监听后,再次回到RegistryProtocol类中,下面就开始调用注册中心进行服务注册和订阅。首先调用getRegistry(originInvoker),spi机制,初始化时注入的是registryFactory适配类,根据url中注册中心参数获取具体的实现类,此处是ZookeeperRegistryFactory

    private Registry  getRegistry(final Invoker<?> originInvoker){
            URL registryUrl = originInvoker.getUrl();
            if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
                String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
                registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
            }
            return registryFactory.getRegistry(registryUrl);
        }
    

    调用到AbstractRegistryFactory.getRegistry()>ZookeeperRegistryFactory.createRegistry(),最终返回一个 ZookeeperRegistry对象

    public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
        //此处是spi机制注入的适配类
        private ZookeeperTransporter zookeeperTransporter;
        public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
            this.zookeeperTransporter = zookeeperTransporter;
        }
        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    }
    

    返回到RegistryProtocol.export(),获得到了 ZookeeperRegistry,然后调用registry.register(registedProviderUrl)进行注册,进一步跟踪,调用链路为:FailbackRegistry.register()-->AbstractRegistry.register()-->ZookeeperRegistry.doRegister(),在zk上创建一个临时节点,注册完成

    protected void doRegister(URL url) {
            try {
                 //在zk上创建一个临时节点
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    回到RegistryProtocol,注册完成之后,下面开始订阅,需要感知zk node的变化,此处使用的zk的watcher机制,首先初始化一个NotifyListener,后面监听变化调用到此对象中的notify方法。

     final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
     final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
     overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
     registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    

    调用链路:FailbackRegistry.subscribe()-->AbstractRegistry.subscribe()-->ZookeeperRegistry.doSubscribe(),主要是注册主节点和子节点监听

    protected void doSubscribe(final URL url, final NotifyListener listener) {
     .......................
       List<URL> urls = new ArrayList<URL>();
                    for (String path : toCategoriesPath(url)) {
                       //zkListeners根据key存储了主节点和子节点监听
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {//没有 初始化
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                       //根据主节点监听获取子节点监听
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {//没有,new一个
                            listeners.putIfAbsent(listener, new ChildListener() {
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    //节点删除,更新会触发notify方法
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                        zkClient.create(path, false);//创造一个永久节点??
                       //添加子节点监听器
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
     .......................
    }
    

    notify(url, listener, urls)调用链比较复杂,链路为:
    FailbackRegistry.notify()-->FailbackRegistry.doNotify()-->AbstractRegistry.notify()-->listener.notify(),此处的listener为刚才registryProtocol传递过来的OverrideListener,然后调用OverrideListener.notify()-->RegistryProtocol.doChangeLocalExport(),对修改了url的invoker重新export,至此整个发布过程全部完成。

    private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl){
            String key = getCacheKey(originInvoker);
            final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null){
                logger.warn(new IllegalStateException("error state, exporter should not be null"));
                return ;//不存在是异常场景 直接返回 
            } else {
                final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
                //调用dubboprotocol.export()重新发布
                exporter.setExporter(protocol.export(invokerDelegete));
            }
        }
    

    相关文章

      网友评论

          本文标题:dubbo服务发布过程浅析

          本文链接:https://www.haomeiwen.com/subject/uinlzxtx.html