美文网首页
Dubbo服务启动过程(三)

Dubbo服务启动过程(三)

作者: 此鱼不得水 | 来源:发表于2017-12-05 19:05 被阅读152次

    在上一节中介绍了Dubbo启动过程中的一个操作:将Dubbo服务注册到Zk上。下面就介绍启动过程的另外一个重要操作:服务监听。

    服务监听逻辑听起来是比较简单的。毕竟就四个字~~服务监听这里的逻辑比较绕,看了好久都感觉有点云里雾里的,所以只能尽力去讲一些自己看懂的地方了,有的地方如果自己说错了请直接评论区回复就好。

    概念澄清

    Channel:Dubbo中的Channel不同于Netty中Channel,也是为了屏蔽底层逻辑而自定义的接口类,比如NettyChannel就是包含了Dubbo中Channel与Netty中的Channel的映射关系。
    ChannelHandler:含义类似于Netty中的ChannelHandler,主要是用于触发某些事件,比如connected,disconnectd,received等事件。

    装饰者模式:在dubbo的源码中充斥着装饰者模式,在我们看到的Handler出现的时候,往往都是装饰者模式在运作着。Handler内部中嵌套着Handler的情景非常之多,所以理清楚每个Handler都干了什么显得十分重要(麻烦)。

    监听端口的过程

    从上一章知道DubboProtocol.export()是这次研究的入口,我们就从这里开始看:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            
            // export service.
            //这个key就是可以唯一区分所暴露的服务的key,group+interface+version
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            //唯一的服务与exporter映射起来,这样的话在DubboProtocol中就能根据三要素找到具体可以执行的Exporter
            exporterMap.put(key, exporter);
            
            //重点
            openServer(url);
            
            return exporter;
        }
        
        private void openServer(URL url) {
            // find server.
            //host:port
            String key = url.getAddress();
            //client 也可以暴露一个只有server可以调用的服务。
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
            if (isServer) {
                //因为Dubbo是基于长链接的,所以每一个client和server只会通过一个长链接来进行通信,所以这里通过client的key与server作为一个映射存储起来
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    //重点
                    serverMap.put(key, createServer(url));
                } else {
                    //server支持reset,配合override功能使用
                    server.reset(url);
                }
            }
        }
        
        private ExchangeServer createServer(URL url) {
            //默认开启server关闭时发送readonly事件
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            //默认开启heartbeat
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
            // 检查是否有对应的扩展类
            if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
            ExchangeServer server;
            try {
                //所以重点就在于对于Server的创建上
                //requestHandler可以理解为对于request的处理器
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    

    从上文的代码中我们看到,最后的重点落到了server = Exchangers.bind(url, requestHandler);上,在这句代码中创建了server并完成了监听。这里也是最初的Handler的原型,即requestHandler。下面我们先理一下Handler都具体做了哪些:

    public interface ChannelHandler {
    
        /**
         * on channel connected.
         * 
         * @param channel channel.
         */
        void connected(Channel channel) throws RemotingException;
    
        /**
         * on channel disconnected.
         * 
         * @param channel channel.
         */
        void disconnected(Channel channel) throws RemotingException;
    
        /**
         * on message sent.
         * 
         * @param channel channel.
         * @param message message.
         */
        void sent(Channel channel, Object message) throws RemotingException;
    
        /**
         * on message received.
         * 
         * @param channel channel.
         * @param message message.
         */
        void received(Channel channel, Object message) throws RemotingException;
    
        /**
         * on exception caught.
         * 
         * @param channel channel.
         * @param exception exception.
         */
        void caught(Channel channel, Throwable exception) throws RemotingException;
    }
    

    ChannelHandler主要是定义跟“管道“相关的接口,这些操作是在netty收到某类远程传来的消息时候触发的行为,是一个比较底层的接口类,之后我们凡是看到Handler结尾的类,大多数都实现了这个接口。

    public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
    
        /**
         * reply.
         * 
         * @param channel
         * @param request
         * @return response
         * @throws RemotingException
         */
        Object reply(ExchangeChannel channel, Object request) throws RemotingException;
    
    }
    

    ExchangeHandler仅仅是作为Handler的扩展子类,其就多一个方法。但是这个方法恰恰表情ExchangeHandler的作用范围:Exchange层面主要是用于信息交换的层面,用于同步转异步操作,其操作的核心就是Request和Response。上面的reply方法就是将request转换为Response的核心接口方法,定义在ExchangeHandler中也十分清晰。

    然后经过上面的介绍之后我们再看一下requestHandler的具体内容:

    //这个类是装饰者模式下最底层的ChannelHandler类
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                //如果Message是调用请求的话
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    //根据exporter领取相关联的invoker,通过inv参数构建出需要的三元素,然后找到对应的exporter,再通过exporter找到对应的invoker
                    Invoker<?> invoker = getInvoker(channel, inv);
                
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    //最终还是委托到invoker去调用真实的接口
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
            // 因为ExchangeHandlerAdapter也是ChannelHandler的接口实现类,所以需要实现相关接口
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    //目前这里并没有其他操作
                    super.received(channel, message);
                }
            }
    
            @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, Constants.ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if(logger.isInfoEnabled()){
                    logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
                }
                invoke(channel, Constants.ON_DISCONNECT_KEY);
            }
            
            //根据三要素构造出来对应的Invocation,然后调用其方法,这里主要处理的是配置的方法,
            private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
            
            private Invocation createInvocation(Channel channel, URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
                RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
                invocation.setAttachment(Constants.PATH_KEY, url.getPath());
                invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
                invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
                invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
                if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
                    invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
                return invocation;
            }
        };
    

    看完了requestHandler,我们再来继续看看那server = Exchangers.bind(url, requestHandler);
    继续跟踪方法看到其实调用的是:
    server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    这里的装饰者模式可以看一下:

    ==1.== handler,也就是上面提到的requestHandler

    ==2.== HeaderExchangeHandler 包装了handler

    // 个人理解:Exchange主要是处理Request和Response的逻辑,所以这里就是调动底层的方法将Request的处理结果转换为Response
        Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
            //构造Request对应的Response
            Response res = new Response(req.getId(), req.getVersion());
            if (req.isBroken()) {//如果请求本身有问题的话
                Object data = req.getData();
    
                String msg;
                if (data == null) msg = null;
                else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                else msg = data.toString();
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus(Response.BAD_REQUEST);
    
                return res;
            }
            // 根据massage类型找到对应的Handler
            Object msg = req.getData();
            try {
                // 执行与Exporter交接的最初的Handler
                Object result = handler.reply(channel, msg);
                res.setStatus(Response.OK);
                res.setResult(result);
            } catch (Throwable e) {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
            }
            return res;
        }
    

    这一层主要是借助与requestHandler的返回结果,将其封装成具体的Response然后返回。

    ==3.== DecodeHandler 包装了HeaderExchangeHandler
    DecodeHandler主要是复写了received方法,将收到的请求信息解码然后传入下一层的Handler调用。因为编码解码的整体不打算在本章涉及,所以这里先忽略。

    通过上面的三部,我们看到了这里对外暴露的是DecodeHandler,然后我们继续跟着原来的逻辑看:

    server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    进一步跟踪代码可以得到下面的逻辑:
    server = new NettyServer(url, listener),这里的listener其实就是刚才我们装饰者模式下生成的DecodeHandler。然后我们进入到NettyServer看一下。

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
            //在URL山添加对应的线程名称然后包装handler
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    

    这里又是装饰者模式,而且又对刚才的DecodeHandler进行包装,我们看一下包装的逻辑:

    public static ChannelHandler wrap(ChannelHandler handler, URL url){
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }
    
        protected ChannelHandlers() {}
    
        
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                            .getAdaptiveExtension().dispatch(handler, url)));
        }
    

    从上面的代码中我们可以看到这里又对handler做了三层包装。我们接着上面讲过的三次包装继续看这写包装:

    ==4.== ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()获得的是默认的AllDispatcher,所以这里的第四层包装就是AllChannelHandler。

        public AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
        父类构造函数如下:
        public WrappedChannelHandler(ChannelHandler handler, URL url) {
            this.handler = handler;
            this.url = url;
            // 因为我司们日常使用的都是fixThreadPool,所以这里就以FixedThreadPool来一看具体获取到的线程池:
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
            
            //感觉就是区分consumer和provider端,这里看意义不大
            String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
            if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
                componentKey = Constants.CONSUMER_SIDE;
            }
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
        
        // 根据url中取到的参数,定义一个线程池,这个线程池非常重要,在我们的工程中执行任务的线程就是这个线程池中的线程,
        // 在我们的dubbo配置中,threads和queues都是重要的参数
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                    queues == 0 ? new SynchronousQueue<Runnable>() : 
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                                : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    

    ==5.== HeartbeatHandler 包装了AllChannelHandler

    这个Handler的操作顾名思义,就是处理心跳有关的操作,我们简单看一下:
        public void connected(Channel channel) throws RemotingException {
            //在channel上设置对应的读写时间
            setReadTimestamp(channel);
            setWriteTimestamp(channel);
            handler.connected(channel);
        }
        该Handler中比较重要的操作是:
        public void received(Channel channel, Object message) throws RemotingException {
            setReadTimestamp(channel);
            //如果是心跳请求或者心跳响应的话就会直接在此步骤进行处理,不会在继续调用之后的handler,减少了不必要的调用。
            if (isHeartbeatRequest(message)) {
                Request req = (Request) message;
                if (req.isTwoWay()) {
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setEvent(Response.HEARTBEAT_EVENT);
                    channel.send(res);
                    if (logger.isInfoEnabled()) {
                        int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                        if(logger.isDebugEnabled()) {
                            logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                            + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                            + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                        }
                    }
                }
                return;
            }
            if (isHeartbeatResponse(message)) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        new StringBuilder(32)
                            .append("Receive heartbeat response in thread ")
                            .append(Thread.currentThread().getName())
                            .toString());
                }
                return;
            }
            handler.received(channel, message);
        }
        综上来看,有了这一层的好处就是某些请求可以在这里直接处理掉,不用再往之后的handler中传递。
    

    ==6.== MultiMessageHandler包装了HeartbeatHandler

        // 这个是MultiMessageHandler中的核心方法
        @SuppressWarnings("unchecked")
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            // 在所有的Handler的最开端处理,如果请求信息是MultiMessage的话,代表可能是多个请求合并而成的请求,所以遍历处理
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage)message;
                for(Object obj : list) {
                    handler.received(channel, obj);
                }
            } else {
                handler.received(channel, message);
            }
        }
    

    到这里基本的Handler包装已经差不多完了,然后我们就接着原来的逻辑继续看:

        public NettyServer(URL url, ChannelHandler handler) throws RemotingException{<br>
            //在URL山添加对应的线程名称然后包装handler
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
        
        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            //获取服务暴露所需参数,host,port
            localAddress = getUrl().toInetSocketAddress();
            String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                            || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                            ? NetUtils.ANYHOST : getUrl().getHost();
            bindAddress = new InetSocketAddress(host, getUrl().getPort());
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                //具体的open操作落到的NettyServer的实现上
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                            + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            if (handler instanceof WrappedChannelHandler ){
                executor = ((WrappedChannelHandler)handler).getExecutor();
            }
        }
        
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            //设置线程池(但是线程池中的线程都是守护线程,为的就是当JVM退出时候不用考虑守护线程是否已经结束)
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory); //Netty启动类
            //定义NettyHandler(这个应该是通用的Handler,只有在服务启动的时候生效一次)
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
    
                    pipeline.addLast("decoder", adapter.getDecoder()); //增加解码处理器
                    pipeline.addLast("encoder", adapter.getEncoder()); //增加编码处理器
                    pipeline.addLast("handler", nettyHandler); //增加具体操作的处理器
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }
        
    

    这里的逻辑就是一般的Netty初始化逻辑,并没有做任何特殊处理的地方。唯一需要注意的就是nettyHandler,nettyHandler中包含了对于当前nettyServer的引用,所以最终的事件处理还是通过上面层层包装的Handler来处理。

    到这里,整体上的服务暴露环节已经讲完。本节的重点就是理解每一层的Handler都干了什么事情,其实这里涉及很多细节的内容都没有讲到,例如参数的含义,Stub,回调的处理等等。因为这次是第一次读源码,所以重点在于对于核心流程的理解,先理解核心概念,后面再在核心流程里面补充其他的细节。

    相关文章

      网友评论

          本文标题:Dubbo服务启动过程(三)

          本文链接:https://www.haomeiwen.com/subject/luacixtx.html