美文网首页
dubbo的调用过程

dubbo的调用过程

作者: 7d972d5e05e8 | 来源:发表于2019-10-25 20:23 被阅读0次

    一、消费者发起请求

    1.1 调用入口

    在@Reference注入的bean的invoke方法,即Invoker.invoke。

    public interface Invoker<T> extends Node {
        /**
         * get service interface.
         *
         * @return service interface.
         */
        Class<T> getInterface();
    
        /**
         * invoke.
         *
         * @param invocation
         * @return result
         * @throws RpcException
         */
        Result invoke(Invocation invocation) throws RpcException;
    }
    

    然后依次调用的路径:
    MockClusterInvoker.invoke -> //用来支持mock
    AbstractClusterInvoker.invoke -> //用来加载指定的负载均衡策略
    FailoverClusterInvoker.doInvoke -> //用来负载均衡选择具体哪个提供者的invoker
    AbstractInvoker.invoke -> 用来初始化一些数据
    DubboInvoker.doInvoke -> 用来执行invoke调用
    HeaderExchangeClient.request -> 用来执行网络调用请求
    HeaderExchangeChannel.request -> 用来执行封装Request请求
    AbstractPeer.send ->
    NettyChannel.send ->
    暂时到这里。想一下为啥会上上面的调用路径呢?因为真正的DubboInvoke会被包装为很多层。比如为了满足服务治理:使用FailoverClusterInvoker,为了满足mock使用MockClusterInvoker,为了满足过滤器,使用ProtocolFilterWrapper这里没有显示出来,等等。
    这里从DubboInvoker的doInvoke开始看,源码如下:

     @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    可以看到,正常调用都是双向的,且是同步的。所以会走到else里面。如下:

     RpcContext.getContext().setFuture(null);
     return (Result) currentClient.request(inv, timeout).get();
    

    这里有个关键的地方:用户线程在发送完request请求后,使用get()方法阻塞本次调用的用户线程,等待ResponseFuture的返回。而该ResponseFuture的实现类DefaultFuture。下面看下DefaultFuture的get方法:

     public Object get() throws RemotingException {
            return get(timeout);
        }
    
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            return returnFromResponse();
        }
    

    明显可以看到!isDone的话,线程await超时时间,阻塞这里等待回来的数据。用户线程到这里就结束了。等待request返回,并唤醒该线程。
    下面我看下request方法:

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    

    到这里看到future=new DefaultFuture()了。然后调用了channel.send(req),后面就是netty的框架了,下节会介绍。

    1.2 消费者调用过程---NIO发送请求

    这里主要是介绍:NettyChannel.send方法:

    public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.write(message);
                if (sent) {
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.getCause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
    
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
    

    追踪下去,到了:

      public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
            ChannelFuture future = future(channel);
            channel.getPipeline().sendDownstream(
                    new DownstreamMessageEvent(channel, future, message, remoteAddress));
            return future;
        }
    

    目前到这里,我们的调用路径如下:


    dubbo调用栈1.png

    Channels.write方法源码如上,调用了sendDownstream方法。在追踪下去,sendDownstream源码如下:

     void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            if (e instanceof UpstreamMessageEvent) {
                throw new IllegalArgumentException("cannot send an upstream event to downstream");
            }
            
            try {
                ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
            } catch (Throwable t) {
                // Unlike an upstream event, a downstream event usually has an
                // incomplete future which is supposed to be updated by ChannelSink.
                // However, if an exception is raised before the event reaches at
                // ChannelSink, the future is not going to be updated, so we update
                // here.
                e.getFuture().setFailure(t);
                notifyHandlerException(e, t);
            }
        }
    

    可以看到handleDownstream方法,由指定handler执行。这里使用OneToOneEncoder执行:

     public void handleDownstream(
                ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
            if (!(evt instanceof MessageEvent)) {
                ctx.sendDownstream(evt);
                return;
            }
    
            MessageEvent e = (MessageEvent) evt;
            Object originalMessage = e.getMessage();
            Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
            if (originalMessage == encodedMessage) {
                ctx.sendDownstream(evt);
            } else if (encodedMessage != null) {
                write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
            }
        }
    

    encode方法就是模板方法了,实现由子类来实现。然后,追踪到在NettyCodecAdapter里面,有个内部类InternalEncoder实现了encode方法。

    
        @Sharable
        private class InternalEncoder extends OneToOneEncoder {
    
            @Override
            protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
                com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                        com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
                NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
                try {
                    codec.encode(channel, buffer, msg);
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ch);
                }
                return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
            }
        }
    

    最后,这里的codec.encode(channel, buffer, msg)委托给了DubboCountCodec.encode
    总结:

    这节内容把消费者的send请求和编码,序列化等底层操作结合起来了。不能追踪到netty层就不向下了,其实dubbo拓展了很多netty的类。导致虽然调用已经走到netty框架,但是很多业务处理,netty还需要回调netty拓展的功能。这种细节还是不能马虎,需要搞懂。举个例子:org.jboss.netty.handler.codec.oneone.OneToOneEncoder是netty的抽象类,必须由子类实现,然后netty在调用的时候,会调用dubbo实现的子类。这里是InternalEncoder类。环环相扣=_=

    紧接着上面的encode结束后,调用了:write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress())方法,继续追踪:

     public void sendDownstream(ChannelEvent e) {
                DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
                if (prev == null) {
                    try {
                        getSink().eventSunk(DefaultChannelPipeline.this, e);
                    } catch (Throwable t) {
                        notifyHandlerException(e, t);
                    }
                } else {
                    DefaultChannelPipeline.this.sendDownstream(prev, e);
                }
            }
    

    该方法属于DefaultChannelPipeline.java类,prev必然为null,然后调用 getSink().eventSunk(DefaultChannelPipeline.this, e)方法,继续追踪到NioClientSocketPipelineSink类,如下:

     public void eventSunk(
                ChannelPipeline pipeline, ChannelEvent e) throws Exception {
            if (e instanceof ChannelStateEvent) {
                ChannelStateEvent event = (ChannelStateEvent) e;
                NioClientSocketChannel channel =
                    (NioClientSocketChannel) event.getChannel();
                ChannelFuture future = event.getFuture();
                ChannelState state = event.getState();
                Object value = event.getValue();
    
                switch (state) {
                case OPEN:
                    if (Boolean.FALSE.equals(value)) {
                        channel.worker.close(channel, future);
                    }
                    break;
                case BOUND:
                    if (value != null) {
                        bind(channel, future, (SocketAddress) value);
                    } else {
                        channel.worker.close(channel, future);
                    }
                    break;
                case CONNECTED:
                    if (value != null) {
                        connect(channel, future, (SocketAddress) value);
                    } else {
                        channel.worker.close(channel, future);
                    }
                    break;
                case INTEREST_OPS:
                    channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                    break;
                }
            } else if (e instanceof MessageEvent) {
                MessageEvent event = (MessageEvent) e;
                NioSocketChannel channel = (NioSocketChannel) event.getChannel();
                boolean offered = channel.writeBuffer.offer(event);
                assert offered;
                channel.worker.writeFromUserCode(channel);
            }
        }
    

    到了这里,终于看到我们想要的事件了。这里根据事件类型选择了MessageEvent。看到调用了writeFromUserCode方法,在后面就只有netty的代码了,调用了write0方法。这里就不在向下追踪了,有兴趣的可以自己看netty。
    调用链路如下,紧接着上面的图:


    dubb调用栈2.png

    总结:到了writeFromUserCode这里,总算把dubbo请求发送给了网络。剩下就是网络包通过TCP/IP协议,传到提供者ip那里去了。在消费者发送请求的过程中,一直使用了是一个线程,在线程执行完request的send操作后,同步得到一个Future,然后一直阻塞在Future.get()方法上,等待返回值。其实这个调用过程抽象出来就是:
    1.服务治理选出一个最佳的提供者ip
    2.执行invoker的各种filter,类似AOP功能
    3.构造请求request,然后encode请求参数,便于网络传输
    4.把序列化好后的二进制流传递给netty
    5.netty把数据发送到网络上

    二、提供者接收请求

    参考文档:http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html

    提供者会一直在dubbo指定的端口上,while(true)监听channel中有没有数据达到。通过netty的reactor模式,netty的IO线程监听有数据达到,然后看是什么事件,select唤起对应的事件处理器。事件处理,由单独的线程池去完成。先看看,在提供者export服务的时候,bind的代码:

     public Channel bind(final SocketAddress localAddress) {
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
    
            final BlockingQueue<ChannelFuture> futureQueue =
                new LinkedBlockingQueue<ChannelFuture>();
    
            ChannelHandler binder = new Binder(localAddress, futureQueue);
            ChannelHandler parentHandler = getParentHandler();
    
            ChannelPipeline bossPipeline = pipeline();
            bossPipeline.addLast("binder", binder);
            if (parentHandler != null) {
                bossPipeline.addLast("userHandler", parentHandler);
            }
    
            Channel channel = getFactory().newChannel(bossPipeline);
    
            // Wait until the future is available.
            ChannelFuture future = null;
            boolean interrupted = false;
            do {
                try {
                    future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            } while (future == null);
    
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
    
            // Wait for the future.
            future.awaitUninterruptibly();
            if (!future.isSuccess()) {
                future.getChannel().close().awaitUninterruptibly();
                throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
            }
    
            return channel;
        }
    

    其中有段代码:

       do {
                try {
                    future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            } while (future == null);
    

    futureQueue会阻塞获取channel上的可读数据,如果有数据达到,那么就唤醒监听线程,调用decode对数据进行解码。

    bind前面还有段代码:

         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
    

    pipeline会设置channel的处理器链。在接收请求后,decoder结束后,下个处理器是handler。然后下面就分析,nettyHandler。

    2.1 服务提供者暴露服务的Server初始化过程

    下面以dubbo为例:
    DubboProtocol.export方法的源码如下:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    

    创建server的核心方法在openServer,如下:

       private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }
    

    核心方法在createServer,如下:

    private ExchangeServer createServer(URL url) {
            // send readonly event when server closes, it's enabled by default
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    

    核心方法在Exchangers.bind方法,其中requestHandler是dubboProtocol类内部重写的内部类。它是提供者bean接收方法要处理的最底层处理逻辑。下面看看它的核心方法,reply方法,如下:

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    

    把message转换为dubbo 语义下的Invocation,该Invocation包含了调用方法,调用参数等等全面的信息,足够运行执行了。下面我们在看下这个real handler上面包装了哪些其他handler,并成为了一个怎样的过滤器链?继续看bind方法。一路追踪下去,看下HeaderExchanger的bind方法,如下:

      public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    

    看到第一个构造过滤器链的handler了,就是HeaderExchangeHandler。继续跟进Transporters.bind方法,如下:

     public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().bind(url, handler);
        }
    

    继续跟踪下去,到new NettyServer,handler还是上面的HeaderExchangeHandler。它的构造方法如下:

      public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    

    在创建nettyServer的时候,会调用ChannelHandlers.wrap方法,构造一个handler的过滤器模式。代码如下:

    public class ChannelHandlers {
    
        private static ChannelHandlers INSTANCE = new ChannelHandlers();
    
        protected ChannelHandlers() {
        }
    
        public static ChannelHandler wrap(ChannelHandler handler, URL url) {
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }
    
        protected static ChannelHandlers getInstance() {
            return INSTANCE;
        }
    
        static void setTestingChannelHandlers(ChannelHandlers instance) {
            INSTANCE = instance;
        }
    
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    }
    

    可以看到wrap方法调用了wrapInternal方法。该方法,先new MultiMessageHandler,然后在HeartBeatHandler,再Dispatcher的Handler。而它是通过SPI去指定的。所以,这个过滤器链就构成了在执行的时候,按照从前向后的顺序,执行链。
    总结:整个过滤器链算是构造好了,MultiMessageHandler -> HeartBeatHandler -> AllDispatcher -> HeaderExchanger -> dubbo real handler
    MultiMessageHandler这些Handler都是使用了装饰器模式,对传入的handler进行了装饰行为。

    这一节分析,其实和第四节分析的很像。因为他们处理模型都是对等的。提供者received request和消费者received reponse,两者过程其实都很相似。都是把realHandler一层一层封装,最终给到netty的pepiline管道。

    三、提供者发送结果

    上面已经分析了提供者接收到nettp的tcp数据message后。给到MultiMessageHandler.received方法,然后在给到HeartBeatHandler.received,再给到AllChannelHandler.received。我们就来看下AllChannelHandler的received方法。如下:

    @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executorService = getExecutorService();
            try {
                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
                //fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
                                "detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        }
    

    看到这行:

                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    

    直接把netty线程传递过来的message传递给线程池的ChannelEventRunnable,然后就返回了。剩下的由dubbo线程池处理,这里如果dubbo线程池满了,是无法处理任何请求的。咱们看下ChannelEventRunnable的run方法,看看它到底干了啥:

     @Override
        public void run() {
            switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case RECEIVED:
                    try {
                        handler.received(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
            }
        }
    

    很明显,这次事件是接收请求,那么就是RECEIVED了。看到执行了handler.received方法。这个handler我们上面分析了,是DecodeHandler(初始化的时候指定的)。进去看下:

    public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                decode(((Response) message).getResult());
            }
    
            handler.received(channel, message);
        }
    

    肯定会走到decode(request)这里。然后解码完毕后,执行handler。这里的handler是HeaderExchangeHandler,也是初始化就指定的了。进去看下它的received方法:

    public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    

    对于request,最终会走到:handler.received(exchangeChannel, request.getData())这里。
    继续handler,这个handler就是dubboProtocol的内部匿名类requestHandler:

    @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    super.received(channel, message);
                }
            }
    

    继续看reply方法:

     @Override
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    //如果是callback 需要处理高版本调用低版本的问题
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || !methodsStr.contains(",")) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    

    getInvoke得到实现类的代理类,然后调用实现类的代理类,执行真正的业务逻辑得到结果。那么这个结果,怎么发送给消费者呢?猜测肯定是用send方法发送。但是在哪里呢?
    答案就在我们上面看到的HeaderExchangeHandler的received方法里面。再看下:

    public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    

    看到如下代码:

     if (request.isTwoWay()) {
          Response response = handleRequest(exchangeChannel, request);
          channel.send(response);
             } else {
                 handler.received(exchangeChannel, request.getData());
                  }
    

    如果是双向通行,表示消费者正在等着结果呢。所以,这里handleRequest结束后,就要把response结果send出去。通过channel.send出去。下面我们就要分析这个channel是怎么初始化的,以及怎么传递进来的。
    这个channel是在NettyHandler收到message的时候生成的。
    NettyHandler.messageReceived方法的源码如下:

     @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            try {
                handler.received(channel, e.getMessage());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
    

    这个channel肯定就是netty用来和消费者通信的socket。channel是流,里面有数据在流淌着,用户可以从这channel里面拿到数据。这个channel也提供发送接收数据的能力。 channel.send(response)方法,我们就知道了这个channel就是nettyChannel。源码如下:

    public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.write(message);
                if (sent) {
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.getCause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
                        + ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
                        e);
            }
    
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
    

    只要通过channel.write(message),把response的数据write到channel就完事了。我们也知道channel是可读可写的

    到此结束,提供方接收请求 -> 处理请求 -> 发送请求,都全部分析结束。
    这里多一嘴,其实消费者调用请求的send也是走这里的。不信可以验证,为了简单起,我们直接定位到DubboInvoker的doInvoke方法:

    @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    看到最后一段代码:

     else {
             RpcContext.getContext().setFuture(null);
             return (Result) currentClient.request(inv, timeout).get();
                }
    

    追踪到HeaderExchangeChannel.request方法:

     public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    

    channel.send最后又到了nettyChannel.send方法。和刚刚发送结果的send是一模一样。都是往channel上write数据。这个数据可以是request也可以是response,对channel来说无所谓。它仅仅是双方用来通信的管道,里面是啥数据,两边是啥角色,channel都不关心。

    《完》

    四、消费者接收结果

    先看下消费者请求接收结果的堆栈。我们把断点设置到AllChannelHandler.received方法的第一行。如下:


    image.png

    重下往上看,一直到SimpleChannelHandler.handleUpstream都是netty的代码,在往上一层就是dubbo的代码了。可能有人会问,你为啥就知道断点到这里呢?我们看下消费者初始化过程就清楚了。这里是网络相关,我们猜想建立网络连接一定在消费者初始化client的时候。所以我们一下子就可以定位到DubboProtocol的refer方法中的getClients(url)。看下getClients方法,最终到initClient方法。看到:

    client = Exchangers.connect(url, requestHandler);
    

    好了,我们现在就要研究这个方法了。requestHandler肯定就是出站数据和入站数据,我们要如何处理的具体hanlder了。你可能有疑问?我们看下这个handler实现的接口好了,它实现了ChannelHandler接口,其中该接口有sent和received方法,就代表了出站和入站。还有不懂,自己去研究下就会了。
    进入connect源码看下:

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            return getExchanger(url).connect(url, handler);
        }
    

    看到connect方法,继续向下:

     public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    

    这里我们看到realHandler先被HeaderExchangeHandler包装,再被DecodeHandler包装。有人这里会奇怪,这些handler在上面的堆栈也没出现啊。对,说明你已经明白了。既然上面堆栈没出现这里包装的类,那是不是推测Transporters.connect里面肯定又做了很多包装。下面我们来验证下,继续向下追踪。

    
        public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            ChannelHandler handler;
            if (handlers == null || handlers.length == 0) {
                handler = new ChannelHandlerAdapter();
            } else if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().connect(url, handler);
        }
    

    看到handler原封不动的传给了getTransporter().connect方法。我们继续推测,包装发生在Transporter.connect里面。继续追踪:

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    

    追踪到NettyTransporter,很简单。继续往下看:

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    

    是不是有点感觉了?wrapChannelHandler方法是不是做了包装。继续往下看:

     protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
            return ChannelHandlers.wrap(handler, url);
        }
    

    继续看ChannelHandlers.wrap(handler, url):

    public class ChannelHandlers {
    
        private static ChannelHandlers INSTANCE = new ChannelHandlers();
    
        protected ChannelHandlers() {
        }
    
        public static ChannelHandler wrap(ChannelHandler handler, URL url) {
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }
    
        protected static ChannelHandlers getInstance() {
            return INSTANCE;
        }
    
        static void setTestingChannelHandlers(ChannelHandlers instance) {
            INSTANCE = instance;
        }
    
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    }
    

    warp方法调用了个单例,然后调用了wrapInternal。看看里面的handler,是不是很熟悉。最终传递给netty就是这个MultiMessageHandler,然后是HeartbeatHandler,里面那个动态SPI就是AllChannelHandler。可能有人会问为啥?我们就来看下dispather的Adaptive代理类长啥样子。如下:

    package com.alibaba.dubbo.remoting;
    
    import com.alibaba.dubbo.common.URL;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    
    public class Dispatcher$Adpative implements Dispatcher {
        public Dispatcher$Adpative() {
        }
    
        public ChannelHandler dispatch(ChannelHandler var1, URL var2) {
            if (var2 == null) {
                throw new IllegalArgumentException("url == null");
            } else {
                String var4 = var2.getParameter("dispatcher", var2.getParameter("dispather", var2.getParameter("channel.handler", "all")));
                if (var4 == null) {
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + var2.toString() + ") use keys([dispatcher, dispather, channel.handler])");
                } else {
                    Dispatcher var5 = (Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(var4);
                    return var5.dispatch(var1, var2);
                }
            }
        }
    }
    

    看到all了没,它是默认配置,用javassist代理生成的。
    到这里只分析道MultiMessageHandler以后的handler,那么它前面的handler是怎么来的?
    继续new NettyClient:

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    

    这里super(url, MultiMessageHandler),进入super看下:

     public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    
            //默认重连间隔2s,1800表示1小时warning一次.
            reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
    
            try {
                doOpen();
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
            try {
                // connect.
               .....//省略
        }
    

    看到doOpen方法,进去看下:

    @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            bootstrap = new ClientBootstrap(channelFactory);
            // config
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
            bootstrap.setOption("keepAlive", true);
            bootstrap.setOption("tcpNoDelay", true);
            bootstrap.setOption("connectTimeoutMillis", getTimeout());
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
        }
    

    看到有句代码:

     final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    

    this就是当前NettyClient,它也是ChannelHandler的实现。它被NettyHandler包装起来,NettyHandler继承了然后就把它给了SimpleChannelHandler,最终传递给了netty的pipeline管道。现在终于把dubbo的hanlder和netty的handler衔接起来了。

    总结下handler的包装过程:
    DubboProtocol.requestHandler < HeaderExchangeHandler < DecodeHandler < AllChannelHandler < HeartbeatHandler < MultiMessageHandler < NettyHandler。我们猜测netty调用dubbo的第一个handler,一定是NettyHandler。在对比下上面的调用堆栈验证下,netty -> dubbo的第一个调用handler确实是nettyHandler。

    到这里就分析结束了。但是这里我要特别说明下,我当时看到这些包装链路的时候,想当然的就把断点设置到最里面那个handler,就是我们真正处理逻辑的handler。就是那个requestHandler,在DubboProtocol的内部类。如下:

    // 请求处理器
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            @Override
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    //如果是callback 需要处理高版本调用低版本的问题
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || !methodsStr.contains(",")) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    super.received(channel, message);
                }
            }
    
            @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, Constants.ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if (logger.isInfoEnabled()) {
                    logger.info("disconnected from " + channel.getRemoteAddress() + ", url: " + channel.getUrl());
                }
                invoke(channel, Constants.ON_DISCONNECT_KEY);
            }
    
            private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
    
            private Invocation createInvocation(URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
                RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
                invocation.setAttachment(Constants.PATH_KEY, url.getPath());
                invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
                invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
                invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
                if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                    invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
                return invocation;
            }
        };
    

    它实现了ExchangeHandlerAdapter接口,里面当然也有received方法。我当时就把断点设置到这里,因为我觉得返回结果肯定最终会到达这里得到处理。结果就是我想错了,从它这里的实现也看得出,它也没法处理response。我们这里来分析下:
    上面调用堆栈看到现在请求结果已经到达了AllChannelHandler,看下上面我们分析的包装过程。下一个handler给到DecodeHandler。看下DecodeHandler:

    
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                decode(((Response) message).getResult());
            }
    
            handler.received(channel, message);
        }
    

    看到解码后,继续执行handler的received方法。继续向下看,到HeaderExchangeHandler。

    public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    

    看到message instanceof Response后,里面的handleResponse。进去看下:

    static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    

    重点来了,它压根就没把reponse委托给下面的requestHandler,自己给处理了。。。我想当然的以为一定会传递给最后一个handler处理呢,没想到到倒数第二个handler后自己给处理完了,不向下传递了。所以解决了我在dubboProtocol那里断点,死活断点不上(异步线程的断点不在这里讨论)。

    相关文章

      网友评论

          本文标题:dubbo的调用过程

          本文链接:https://www.haomeiwen.com/subject/uzeevctx.html