美文网首页
Netty在Dubbo服务暴露时何时被使用

Netty在Dubbo服务暴露时何时被使用

作者: 书唐瑞 | 来源:发表于2020-12-27 16:00 被阅读0次

    Dubbo的底层通信使用的是Netty.

    关于Dubbo的服务暴露流程,网络上已经有很多优质的文章.此篇文章以Dubbo的服务暴露为主线(不会详细讲解),观察一下,Netty在服务暴露过程中何时被使用.

    // 服务暴露的起点
    com.alibaba.dubbo.config.spring.ServiceBean#onApplicationEvent
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
      if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
                    // 服务暴露
            export();
        }
    }
    

    流程会走到如下代码

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }​
    registryURLs的值如下(样例) 
    registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=infuq-dubbo-provider&check=false&dubbo=2.0.2&pid=4916&registry=zookeeper&timestamp=1609052431702
    
    protocols的值如下(样例) 
    <dubbo:protocol name="dubbo" threads="200" port="20880" id="dubbo" /> 
    

    流程继续走到如下代码

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    ​
    Exporter<?> exporter = protocol.export(wrapperInvoker);​
    

    基于SPI机制,protocol会调用com.alibaba.dubbo.registry.integration.RegistryProtocol#export,源码如下

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
        URL registryUrl = getRegistryUrl(originInvoker);
    
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    
        boolean register = registeredProviderUrl.getParameter("register", true);
    
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    
        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }
        
        ...
    }
    

    第一行的doLocalExport方法,继续跟进

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                                    // 暴露
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
    

    基于SPI机制,protocol会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#export,源码如下

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        ...
    
        // 开启服务器
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
    

    继续跟进openServer方法

    private void openServer(URL url) {
    
        String key = url.getAddress();
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                            // 创建服务器
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    
    url如下(样例)
    dubbo://192.168.0.102:20880/com.infuq.facade.QueryUserInfoFacade?anyhost=true&application=infuq-dubbo-provider&bean.name=com.infuq.facade.QueryUserInfoFacade&bind.ip=192.168.0.102&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.infuq.facade.QueryUserInfoFacade&methods=update&pid=13528&revision=1.0.0&side=provider&threads=200&timestamp=1609053786543&version=1.0.0
    

    一直跟进,会依次调用如下几个主要方法​

    server = Exchangers.bind(url, requestHandler);
    
    getExchanger(url).bind(url, handler);
    
    new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    
    getTransporter().bind(url, handler);
    
    new NettyServer(url, listener);
    

    在创建NettyServer时候,最终会调用到如下代码

    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
    
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
    
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
    
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    
    }
    

    上面的代码就是很熟悉的Netty创建服务端的代码.最终会创建并启动服务端.


    图片.png

    也就是说,在暴露服务的过程中,在进行doLocalExport本地暴露的时候,会分别经过RegistryProtocol#export和DubboProtocol#export,最后通过Netty创建一个服务端,监听外部的接口调用请求.

    虽然本地服务已经暴露,但是还需要将服务注册到注册中心(例如ZK)

    在没有注册到ZK之前,查看下ZK信息


    图片.png

    是没有dubbo节点信息的.

    ​@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
      // 本地暴露
      final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    ​
      URL registryUrl = getRegistryUrl(originInvoker);
      final Registry registry = getRegistry(originInvoker);
      final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    ​
      boolean register = registeredProviderUrl.getParameter("register", true);
      ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    ​
      if (register) {
        // 注册服务
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
      }
    ​
      ...
    }
    
    registryUrl内容如下(样例)
    zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=infuq-dubbo-provider&check=false&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.0.102%3A20880%2Fcom.infuq.facade.QueryUserInfoFacade%3Fanyhost%3Dtrue%26application%3Dinfuq-dubbo-provider%26bean.name%3Dcom.infuq.facade.QueryUserInfoFacade%26bind.ip%3D192.168.0.102%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.infuq.facade.QueryUserInfoFacade%26methods%3Dupdate%26pid%3D9892%26revision%3D1.0.0%26side%3Dprovider%26threads%3D200%26timestamp%3D1609055193386%26version%3D1.0.0&pid=9892&timestamp=1609055193386
    
    registeredProviderUrl内容如下(样例)
    dubbo://192.168.0.102:20880/com.infuq.facade.QueryUserInfoFacade?anyhost=true&application=infuq-dubbo-provider&bean.name=com.infuq.facade.QueryUserInfoFacade&dubbo=2.0.2&generic=false&interface=com.infuq.facade.QueryUserInfoFacade&methods=update&pid=9892&revision=1.0.0&side=provider&threads=200&timestamp=1609055193386&version=1.0.0
    
    

    ​当执行完register方法之后,再查看ZK信息


    图片.png

    已经有dubbo节点信息了,说明提供者已经注册到ZK上了.​

    总结
    Dubbo在暴露​服务的过程中,首先会通过​Netty创建并启动服务端,监听外部调用接口的请求.紧接着会将服务注册到注册中心(例如Zookeeper).

    相关文章

      网友评论

          本文标题:Netty在Dubbo服务暴露时何时被使用

          本文链接:https://www.haomeiwen.com/subject/spyonktx.html