美文网首页
dubbo源码解析之server netty启动(四)

dubbo源码解析之server netty启动(四)

作者: binecy | 来源:发表于2019-03-19 20:44 被阅读0次

    源码分析基于dubbo 2.7.1

    前面讲到,RegistryProtocol.doLocalExport会暴露服务

        private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
            String key = getCacheKey(originInvoker);
    
            return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
                Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
                return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
            });
        }
    

    这里providerUrl格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?...(它是由registryUrl registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?&export=dubbo... 转化以来),
    所以会调用DubboProtocol,但会先经过装饰类ProtocolListenerWrapper,ProtocolFilterWrapper。

    先看看DubboProtocol.export

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            
            String key = serviceKey(url);
            // 创建DubboExporter      
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            ...
    
            openServer(url);
            optimizeSerialization(url);
    
            return exporter;
        }
    

    注意一下,这里会构建DubboExporter,并缓存到exporterMap。(后面会用到)

    关键就是openServer

    private void openServer(URL url) {
        String key = url.getAddress();
        // 客户端可以暴露仅供服务器调用的服务
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    

    如果ExchangeServer不存在,就createServer

    private ExchangeServer createServer(URL url) {
        // url 参数处理
        ...
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        ...
    
        return server;
    }
    

    Exchange

    Exchangers.bind(url, requestHandler)这里第二个参数requestHandler是DubboProtocol中的内部类,它是一个关键的类,后面会说到。这里会调用HeaderExchanger.bind

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    Transporter

    Transporter接口同样提供方法bind/connect

    Transporters根据server/transporter参数,通过ExtensionLoader找到一个Transporter实现类,默认使用netty3。

    我配置了netty4,所以会使用com.alibaba.dubbo.remoting.transport.netty4包下的netty代码。
    NettyTransporter

        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    

    NettyServer实现了Server接口,该接口提供isBound/getChannels/reset等方法。
    NettyServer构造方法

      public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    

    先看看ChannelHandlers.wrap

        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    

    Dispatcher

    这里出现一个新的概念Dispatcher,它是dubbo中的调度器,默认是AllDispatcher

        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return new AllChannelHandler(handler, url);
        }
    

    NettyServer继承了AbstractServer,AbstractServer的构造方法会调用NettyServer.doOpen

        protected void doOpen() throws Throwable {
        
            bootstrap = new ServerBootstrap();
            // 构建bossGroup, 线程为1
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
            // 构建NettyServerHandler
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            
                            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
        }
    

    .addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())添加编码器和解码器,将字节码转化为dubbo对象。dubbo协议使用的是DubboCountCodec。

    .addLast("handler", nettyServerHandler) 添加了nettyServerHandler处理请求。

    new NettyServerHandler(getUrl(), this)构建NettyServerHandler时,NettyServe将自身this作为参数,它也实现了ChannelHandler,同时构造NettyServe时也需要传入一个ChannelHandler参数。

    ChannelHandler是处理请求的重要接口,提供received/sent等方法。

    dubbo使用装饰模式,为ChannelHandler添加了不同的功能:
    NettyServe ---> MultiMessageHandler(NettyServer) ---> HeartbeatHandler(NettyServer) ---> AllChannelHandler(AllDispatcher) ---> DecodeHandler(HeaderExchanger) ---> HeaderExchangeHandler(HeaderExchanger) ---> DubboProtocol.requestHandler

    相关文章

      网友评论

          本文标题:dubbo源码解析之server netty启动(四)

          本文链接:https://www.haomeiwen.com/subject/jeigmqtx.html