美文网首页
dubbo的调用过程

dubbo的调用过程

作者: 7d972d5e05e8 | 来源:发表于2019-10-25 20:23 被阅读0次

一、消费者发起请求

1.1 调用入口

在@Reference注入的bean的invoke方法,即Invoker.invoke。

public interface Invoker<T> extends Node {
    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;
}

然后依次调用的路径:
MockClusterInvoker.invoke -> //用来支持mock
AbstractClusterInvoker.invoke -> //用来加载指定的负载均衡策略
FailoverClusterInvoker.doInvoke -> //用来负载均衡选择具体哪个提供者的invoker
AbstractInvoker.invoke -> 用来初始化一些数据
DubboInvoker.doInvoke -> 用来执行invoke调用
HeaderExchangeClient.request -> 用来执行网络调用请求
HeaderExchangeChannel.request -> 用来执行封装Request请求
AbstractPeer.send ->
NettyChannel.send ->
暂时到这里。想一下为啥会上上面的调用路径呢?因为真正的DubboInvoke会被包装为很多层。比如为了满足服务治理:使用FailoverClusterInvoker,为了满足mock使用MockClusterInvoker,为了满足过滤器,使用ProtocolFilterWrapper这里没有显示出来,等等。
这里从DubboInvoker的doInvoke开始看,源码如下:

 @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

可以看到,正常调用都是双向的,且是同步的。所以会走到else里面。如下:

 RpcContext.getContext().setFuture(null);
 return (Result) currentClient.request(inv, timeout).get();

这里有个关键的地方:用户线程在发送完request请求后,使用get()方法阻塞本次调用的用户线程,等待ResponseFuture的返回。而该ResponseFuture的实现类DefaultFuture。下面看下DefaultFuture的get方法:

 public Object get() throws RemotingException {
        return get(timeout);
    }

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

明显可以看到!isDone的话,线程await超时时间,阻塞这里等待回来的数据。用户线程到这里就结束了。等待request返回,并唤醒该线程。
下面我看下request方法:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

到这里看到future=new DefaultFuture()了。然后调用了channel.send(req),后面就是netty的框架了,下节会介绍。

1.2 消费者调用过程---NIO发送请求

这里主要是介绍:NettyChannel.send方法:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

追踪下去,到了:

  public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(
                new DownstreamMessageEvent(channel, future, message, remoteAddress));
        return future;
    }

目前到这里,我们的调用路径如下:


dubbo调用栈1.png

Channels.write方法源码如上,调用了sendDownstream方法。在追踪下去,sendDownstream源码如下:

 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }
        
        try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }

可以看到handleDownstream方法,由指定handler执行。这里使用OneToOneEncoder执行:

 public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
        if (!(evt instanceof MessageEvent)) {
            ctx.sendDownstream(evt);
            return;
        }

        MessageEvent e = (MessageEvent) evt;
        Object originalMessage = e.getMessage();
        Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
        if (originalMessage == encodedMessage) {
            ctx.sendDownstream(evt);
        } else if (encodedMessage != null) {
            write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
        }
    }

encode方法就是模板方法了,实现由子类来实现。然后,追踪到在NettyCodecAdapter里面,有个内部类InternalEncoder实现了encode方法。


    @Sharable
    private class InternalEncoder extends OneToOneEncoder {

        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }

最后,这里的codec.encode(channel, buffer, msg)委托给了DubboCountCodec.encode
总结:

这节内容把消费者的send请求和编码,序列化等底层操作结合起来了。不能追踪到netty层就不向下了,其实dubbo拓展了很多netty的类。导致虽然调用已经走到netty框架,但是很多业务处理,netty还需要回调netty拓展的功能。这种细节还是不能马虎,需要搞懂。举个例子:org.jboss.netty.handler.codec.oneone.OneToOneEncoder是netty的抽象类,必须由子类实现,然后netty在调用的时候,会调用dubbo实现的子类。这里是InternalEncoder类。环环相扣=_=

紧接着上面的encode结束后,调用了:write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress())方法,继续追踪:

 public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }

该方法属于DefaultChannelPipeline.java类,prev必然为null,然后调用 getSink().eventSunk(DefaultChannelPipeline.this, e)方法,继续追踪到NioClientSocketPipelineSink类,如下:

 public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioClientSocketChannel channel =
                (NioClientSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
                if (value != null) {
                    bind(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case CONNECTED:
                if (value != null) {
                    connect(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBuffer.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);
        }
    }

到了这里,终于看到我们想要的事件了。这里根据事件类型选择了MessageEvent。看到调用了writeFromUserCode方法,在后面就只有netty的代码了,调用了write0方法。这里就不在向下追踪了,有兴趣的可以自己看netty。
调用链路如下,紧接着上面的图:


dubb调用栈2.png

总结:到了writeFromUserCode这里,总算把dubbo请求发送给了网络。剩下就是网络包通过TCP/IP协议,传到提供者ip那里去了。在消费者发送请求的过程中,一直使用了是一个线程,在线程执行完request的send操作后,同步得到一个Future,然后一直阻塞在Future.get()方法上,等待返回值。其实这个调用过程抽象出来就是:
1.服务治理选出一个最佳的提供者ip
2.执行invoker的各种filter,类似AOP功能
3.构造请求request,然后encode请求参数,便于网络传输
4.把序列化好后的二进制流传递给netty
5.netty把数据发送到网络上

二、提供者接收请求

参考文档:http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html

提供者会一直在dubbo指定的端口上,while(true)监听channel中有没有数据达到。通过netty的reactor模式,netty的IO线程监听有数据达到,然后看是什么事件,select唤起对应的事件处理器。事件处理,由单独的线程池去完成。先看看,在提供者export服务的时候,bind的代码:

 public Channel bind(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }

        final BlockingQueue<ChannelFuture> futureQueue =
            new LinkedBlockingQueue<ChannelFuture>();

        ChannelHandler binder = new Binder(localAddress, futureQueue);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }

        Channel channel = getFactory().newChannel(bossPipeline);

        // Wait until the future is available.
        ChannelFuture future = null;
        boolean interrupted = false;
        do {
            try {
                future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                interrupted = true;
            }
        } while (future == null);

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        // Wait for the future.
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            future.getChannel().close().awaitUninterruptibly();
            throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
        }

        return channel;
    }

其中有段代码:

   do {
            try {
                future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                interrupted = true;
            }
        } while (future == null);

futureQueue会阻塞获取channel上的可读数据,如果有数据达到,那么就唤醒监听线程,调用decode对数据进行解码。

bind前面还有段代码:

     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());

pipeline会设置channel的处理器链。在接收请求后,decoder结束后,下个处理器是handler。然后下面就分析,nettyHandler。

2.1 服务提供者暴露服务的Server初始化过程

下面以dubbo为例:
DubboProtocol.export方法的源码如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

创建server的核心方法在openServer,如下:

   private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

核心方法在createServer,如下:

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

核心方法在Exchangers.bind方法,其中requestHandler是dubboProtocol类内部重写的内部类。它是提供者bean接收方法要处理的最底层处理逻辑。下面看看它的核心方法,reply方法,如下:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

把message转换为dubbo 语义下的Invocation,该Invocation包含了调用方法,调用参数等等全面的信息,足够运行执行了。下面我们在看下这个real handler上面包装了哪些其他handler,并成为了一个怎样的过滤器链?继续看bind方法。一路追踪下去,看下HeaderExchanger的bind方法,如下:

  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

看到第一个构造过滤器链的handler了,就是HeaderExchangeHandler。继续跟进Transporters.bind方法,如下:

 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

继续跟踪下去,到new NettyServer,handler还是上面的HeaderExchangeHandler。它的构造方法如下:

  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

在创建nettyServer的时候,会调用ChannelHandlers.wrap方法,构造一个handler的过滤器模式。代码如下:

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

可以看到wrap方法调用了wrapInternal方法。该方法,先new MultiMessageHandler,然后在HeartBeatHandler,再Dispatcher的Handler。而它是通过SPI去指定的。所以,这个过滤器链就构成了在执行的时候,按照从前向后的顺序,执行链。
总结:整个过滤器链算是构造好了,MultiMessageHandler -> HeartBeatHandler -> AllDispatcher -> HeaderExchanger -> dubbo real handler
MultiMessageHandler这些Handler都是使用了装饰器模式,对传入的handler进行了装饰行为。

这一节分析,其实和第四节分析的很像。因为他们处理模型都是对等的。提供者received request和消费者received reponse,两者过程其实都很相似。都是把realHandler一层一层封装,最终给到netty的pepiline管道。

三、提供者发送结果

上面已经分析了提供者接收到nettp的tcp数据message后。给到MultiMessageHandler.received方法,然后在给到HeartBeatHandler.received,再给到AllChannelHandler.received。我们就来看下AllChannelHandler的received方法。如下:

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executorService = getExecutorService();
        try {
            executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
            //fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
                            "detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
        }
    }

看到这行:

            executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

直接把netty线程传递过来的message传递给线程池的ChannelEventRunnable,然后就返回了。剩下的由dubbo线程池处理,这里如果dubbo线程池满了,是无法处理任何请求的。咱们看下ChannelEventRunnable的run方法,看看它到底干了啥:

 @Override
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

很明显,这次事件是接收请求,那么就是RECEIVED了。看到执行了handler.received方法。这个handler我们上面分析了,是DecodeHandler(初始化的时候指定的)。进去看下:

public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

肯定会走到decode(request)这里。然后解码完毕后,执行handler。这里的handler是HeaderExchangeHandler,也是初始化就指定的了。进去看下它的received方法:

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

对于request,最终会走到:handler.received(exchangeChannel, request.getData())这里。
继续handler,这个handler就是dubboProtocol的内部匿名类requestHandler:

@Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

继续看reply方法:

 @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

getInvoke得到实现类的代理类,然后调用实现类的代理类,执行真正的业务逻辑得到结果。那么这个结果,怎么发送给消费者呢?猜测肯定是用send方法发送。但是在哪里呢?
答案就在我们上面看到的HeaderExchangeHandler的received方法里面。再看下:

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

看到如下代码:

 if (request.isTwoWay()) {
      Response response = handleRequest(exchangeChannel, request);
      channel.send(response);
         } else {
             handler.received(exchangeChannel, request.getData());
              }

如果是双向通行,表示消费者正在等着结果呢。所以,这里handleRequest结束后,就要把response结果send出去。通过channel.send出去。下面我们就要分析这个channel是怎么初始化的,以及怎么传递进来的。
这个channel是在NettyHandler收到message的时候生成的。
NettyHandler.messageReceived方法的源码如下:

 @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

这个channel肯定就是netty用来和消费者通信的socket。channel是流,里面有数据在流淌着,用户可以从这channel里面拿到数据。这个channel也提供发送接收数据的能力。 channel.send(response)方法,我们就知道了这个channel就是nettyChannel。源码如下:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
                    + ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
                    e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

只要通过channel.write(message),把response的数据write到channel就完事了。我们也知道channel是可读可写的

到此结束,提供方接收请求 -> 处理请求 -> 发送请求,都全部分析结束。
这里多一嘴,其实消费者调用请求的send也是走这里的。不信可以验证,为了简单起,我们直接定位到DubboInvoker的doInvoke方法:

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

看到最后一段代码:

 else {
         RpcContext.getContext().setFuture(null);
         return (Result) currentClient.request(inv, timeout).get();
            }

追踪到HeaderExchangeChannel.request方法:

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

channel.send最后又到了nettyChannel.send方法。和刚刚发送结果的send是一模一样。都是往channel上write数据。这个数据可以是request也可以是response,对channel来说无所谓。它仅仅是双方用来通信的管道,里面是啥数据,两边是啥角色,channel都不关心。

《完》

四、消费者接收结果

先看下消费者请求接收结果的堆栈。我们把断点设置到AllChannelHandler.received方法的第一行。如下:


image.png

重下往上看,一直到SimpleChannelHandler.handleUpstream都是netty的代码,在往上一层就是dubbo的代码了。可能有人会问,你为啥就知道断点到这里呢?我们看下消费者初始化过程就清楚了。这里是网络相关,我们猜想建立网络连接一定在消费者初始化client的时候。所以我们一下子就可以定位到DubboProtocol的refer方法中的getClients(url)。看下getClients方法,最终到initClient方法。看到:

client = Exchangers.connect(url, requestHandler);

好了,我们现在就要研究这个方法了。requestHandler肯定就是出站数据和入站数据,我们要如何处理的具体hanlder了。你可能有疑问?我们看下这个handler实现的接口好了,它实现了ChannelHandler接口,其中该接口有sent和received方法,就代表了出站和入站。还有不懂,自己去研究下就会了。
进入connect源码看下:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

看到connect方法,继续向下:

 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

这里我们看到realHandler先被HeaderExchangeHandler包装,再被DecodeHandler包装。有人这里会奇怪,这些handler在上面的堆栈也没出现啊。对,说明你已经明白了。既然上面堆栈没出现这里包装的类,那是不是推测Transporters.connect里面肯定又做了很多包装。下面我们来验证下,继续向下追踪。


    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

看到handler原封不动的传给了getTransporter().connect方法。我们继续推测,包装发生在Transporter.connect里面。继续追踪:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

追踪到NettyTransporter,很简单。继续往下看:

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

是不是有点感觉了?wrapChannelHandler方法是不是做了包装。继续往下看:

 protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

继续看ChannelHandlers.wrap(handler, url):

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

warp方法调用了个单例,然后调用了wrapInternal。看看里面的handler,是不是很熟悉。最终传递给netty就是这个MultiMessageHandler,然后是HeartbeatHandler,里面那个动态SPI就是AllChannelHandler。可能有人会问为啥?我们就来看下dispather的Adaptive代理类长啥样子。如下:

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Dispatcher$Adpative implements Dispatcher {
    public Dispatcher$Adpative() {
    }

    public ChannelHandler dispatch(ChannelHandler var1, URL var2) {
        if (var2 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var2.getParameter("dispatcher", var2.getParameter("dispather", var2.getParameter("channel.handler", "all")));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + var2.toString() + ") use keys([dispatcher, dispather, channel.handler])");
            } else {
                Dispatcher var5 = (Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(var4);
                return var5.dispatch(var1, var2);
            }
        }
    }
}

看到all了没,它是默认配置,用javassist代理生成的。
到这里只分析道MultiMessageHandler以后的handler,那么它前面的handler是怎么来的?
继续new NettyClient:

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

这里super(url, MultiMessageHandler),进入super看下:

 public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);

        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

        //默认重连间隔2s,1800表示1小时warning一次.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        try {
            doOpen();
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
        try {
            // connect.
           .....//省略
    }

看到doOpen方法,进去看下:

@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

看到有句代码:

 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

this就是当前NettyClient,它也是ChannelHandler的实现。它被NettyHandler包装起来,NettyHandler继承了然后就把它给了SimpleChannelHandler,最终传递给了netty的pipeline管道。现在终于把dubbo的hanlder和netty的handler衔接起来了。

总结下handler的包装过程:
DubboProtocol.requestHandler < HeaderExchangeHandler < DecodeHandler < AllChannelHandler < HeartbeatHandler < MultiMessageHandler < NettyHandler。我们猜测netty调用dubbo的第一个handler,一定是NettyHandler。在对比下上面的调用堆栈验证下,netty -> dubbo的第一个调用handler确实是nettyHandler。

到这里就分析结束了。但是这里我要特别说明下,我当时看到这些包装链路的时候,想当然的就把断点设置到最里面那个handler,就是我们真正处理逻辑的handler。就是那个requestHandler,在DubboProtocol的内部类。如下:

// 请求处理器
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconnected from " + channel.getRemoteAddress() + ", url: " + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

它实现了ExchangeHandlerAdapter接口,里面当然也有received方法。我当时就把断点设置到这里,因为我觉得返回结果肯定最终会到达这里得到处理。结果就是我想错了,从它这里的实现也看得出,它也没法处理response。我们这里来分析下:
上面调用堆栈看到现在请求结果已经到达了AllChannelHandler,看下上面我们分析的包装过程。下一个handler给到DecodeHandler。看下DecodeHandler:


    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

看到解码后,继续执行handler的received方法。继续向下看,到HeaderExchangeHandler。

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

看到message instanceof Response后,里面的handleResponse。进去看下:

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

重点来了,它压根就没把reponse委托给下面的requestHandler,自己给处理了。。。我想当然的以为一定会传递给最后一个handler处理呢,没想到到倒数第二个handler后自己给处理完了,不向下传递了。所以解决了我在dubboProtocol那里断点,死活断点不上(异步线程的断点不在这里讨论)。

相关文章

  • Java进阶-Dubbo-进阶

    一、服务调用过程 1.1 服务调用方式   Dubbo 服务调用过程:   Dubbo 支持同步和异步两种调用方式...

  • Dubbo调用过程

    一个微服务主要有三个角色:服务提供者、服务的消费者、服务注册中心(服务代理)。 服务注册 服务,如用户服务、库存服...

  • Dubbo的异步调用

    Dubbo调用的过程会有网络延时,处理耗时等,如果业务上对于调用结果并不实时依赖,可以使用异步调用的方式Dubbo...

  • dubbo的调用过程

    一、消费者发起请求 1.1 调用入口 在@Reference注入的bean的invoke方法,即Invoker.i...

  • Dubbo的调用栈详细介绍

    Dubbo framework示意图 整个调用过程 (1)protocol 远程调用层:封装 RPC 调用,以 I...

  • Dubbo夺命17连问

    目录 1.Dubbo是什么?RPC又是什么? 2. Dubbo能做什么? 3.能说下Dubbo的总体的调用过程吗?...

  • dubbo的学习笔记

    最近面试复习dubbo随便写写 *首先什么是dubbo?dubbo是阿里开源的一个RPC(远程过程调用,是一种支持...

  • dubbo Serialized class org.apach

    在使用dubbo实现远程调用的过程中,出现了这个错误,原因是,dubbo远程调用的参数类都必须实现序列化。在项目中...

  • Dubbo调用过程分析

    文档目的: 公司目前使用的dubbo版本是2.6.2,看完dubbo官方文档中的一些功能,所以就想知道dubbo调...

  • dubbo的服务调用过程

    服务消费的过程:referenceConfig类的init方法调用Protocol的refer方法,生成invok...

网友评论

      本文标题:dubbo的调用过程

      本文链接:https://www.haomeiwen.com/subject/uzeevctx.html