美文网首页
dubbo consumer端发送request与接收respo

dubbo consumer端发送request与接收respo

作者: 雕兄L | 来源:发表于2019-06-23 11:43 被阅读0次
本篇内容主要分析client端发送的request和接收的response是如何在Exchanger层和Transport层流转。
  • 如何发送request请求

在执行request时,是从DubboProtocol的refer()方法里拿到一个DubboInvoker实例,然后request就从protocol层进入exchanger层。DubboInvoker里的client其实是一个被包装了很多层的对象。首先从ReferenceCountExchangeClient说起,ReferenceCountExchangeClient拿到请求后把请求直接转发给了HeaderExchangeClient

@Override
   protected Result doInvoke(final Invocation invocation) throws Throwable {
       RpcInvocation inv = (RpcInvocation) invocation;
       final String methodName = RpcUtils.getMethodName(invocation);
       inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
       inv.setAttachment(Constants.VERSION_KEY, version);

       ExchangeClient currentClient;
       if (clients.length == 1) {
           currentClient = clients[0];
       } else {
           currentClient = clients[index.getAndIncrement() % clients.length];
       }
       try {
           boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
           boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
           int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
           if (isOneway) {
               boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
               currentClient.send(inv, isSent);
               RpcContext.getContext().setFuture(null);
               return new RpcResult();
           } else if (isAsync) {
               ResponseFuture future = currentClient.request(inv, timeout);
               RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
               return new RpcResult();
           } else {
               RpcContext.getContext().setFuture(null);
               //client开始执行的地方
               return (Result) currentClient.request(inv, timeout).get();
           }
       } catch (TimeoutException e) {
           throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
       } catch (RemotingException e) {
           throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
       }
   }

HeaderExchangeClient是把请求转发给了channel,通过构造方法可以看到,其channel实例是exchanger层的HeaderExchangeChannel

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
     if (client == null) {
         throw new IllegalArgumentException("client == null");
     }
     this.client = client;
     //channel实例化 注意client被传递给了HeaderExchangeChannel
     this.channel = new HeaderExchangeChannel(client);
     String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
     this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
     this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
     if (heartbeatTimeout < heartbeat * 2) {
         throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
     }
     if (needHeartbeat) {
         startHeartbeatTimer();
     }
 }

再来到HeaderExchangeChannel的request方法,发现是又转给了channel.send(),那么我们可以推测,这个channel应该就是transport层的channel了,其实这channel就是之前提到的NettyClient,这里request从exchange层流转到了transport层

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

NettyClient继承的类有点多,就不一一列列举了,具体就是从父类AbstractPeer的send方法一直执行到AbstractClient的send方法,AbstractClient的send方法如下:

public void send(Object message, boolean sent) throws RemotingException {
     if (send_reconnect && !isConnected()) {
         connect();
     }
     //注意这里有一个getChannel方法,是子类需要实现的方法,
     //咋们看下NettyClient的实现就知道什么意思了
     Channel channel = getChannel();
     //TODO Can the value returned by getChannel() be null? need improvement.
     if (channel == null || !channel.isConnected()) {
         throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
     }
     channel.send(message, sent);
 }

NettyClient的getChannel()实现可以看到,是直接把NettyClient的channel成员变量包装了下返回了,这个channel就是Netty的原生channel对象。

@Override
protected com.alibaba.dubbo.remoing.Channel getChannel() {
  //这里把connect()方法建立的channel对象赋值过来(SocketChannel)
    Channel c = channel;
    if (c == null || !c.isConnected())
        return null;
    return NettyChannel.getOrAddChannel(c, getUrl(), this);
}

感觉挺绕的,到了这里又多出来个NettyChannel,NettyChannel的getOrAddChannel如下代码所示,是用NettyChannel对象包装了Netty原生channel,然后以Netty原生的channel对象为key,存起来了。NettyChannel对应原生的Channel对象,一个client与server之间拥有一个NettyChannel对象,从这里可以先预判,各个client与server之间交互是保持长连接的。这个NettyChannel实现了dubbo的channel接口,主要实现了写事件。

static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
     if (ch == null) {
         return null;
     }
     NettyChannel ret = channelMap.get(ch);
     if (ret == null) {
         NettyChannel nc = new NettyChannel(ch, url, handler);
         if (ch.isConnected()) {
             ret = channelMap.putIfAbsent(ch, nc);
         }
         if (ret == null) {
             ret = nc;
         }
     }
     return ret;
 }
 public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
      //原生的Netty写事件,把数据写入channel,即开始向远端服务发送请求
        ChannelFuture future = channel.write(message);
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

回到AbstractClient的send方法里,拿到channel之后,又执行了channel的send方法,其实就是执行了NettyChannel的send方法,看上面的代码可以看到,是调用了Netty原生的channel把数据写出去了.

到这里,request就已经发送出去了。感觉dubbo发送个请求真的是太绕了,绕晕了有没有.


  • 如何处理返回的Response结果

发送完request之后还得处理返回的response,response的处理应该由handler来完成,即应该由底层io捕捉读事件,把收到的response传到transport层。NettyHanler是处理Netty读写事件的handler,在NettyClient里初始化NettyHandler拦截器时,把自己传递给了NettyHandler。所以NettyClient本身也充当了一个handler角色,是一个包装了其他handler的handler。
回过头再来看下NettyClient的初始化化逻辑,构造方法里有一段逻辑调用了父类的wrapChannelHandler()方法,通过方法名可以看出,这是在初始化父类的handler实例,并且这个handler被包装了。

public class NettyClient extends AbstractClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    // ChannelFactory's closure has a DirectMemory leak, using static to avoid
    // https://issues.jboss.org/browse/NETTY-424
    private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
            Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
            Constants.DEFAULT_IO_THREADS);
    private ClientBootstrap bootstrap;

    private volatile Channel channel; // volatile, please copy reference to use

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
      //执行handler包装初始化逻辑
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        //初始化NettyHandler时,把这个NettyClient实例传递了过去
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                //把NettyHandler加入拦截链,由这个handler处理netty的各种读写事件
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

    ....省略掉部分代码
    @Override
    protected org.apache.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

进入wrapChannelHandler()方法,最后看到的是这样一段逻辑:

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
      //初始化代码
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
      }
    }

那么现在的情况是NettyClient包装了MultiMessageHandler,MultiMessageHandler包装了HeartbeatHandler,HeartbeatHandler又把请求代理给了Dispatcher,Dispatcher大概有AllDispatcher,DirectDispatcher,ExecutionDispatcher,MessageOnlyDispatcher 几个实现,默认使用的是AllDispatcher,即把请求代理给了AllChannelHandler,回头再来看AllChannelHandler的实现细节。

再看下NettyClient的继承关系:NettyClient extend AbstractClient extend AbstractEndpoint extend AbstractPeer,通过查看多个父类,可以看到是AbstractPeer的初始逻辑

public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    private final ChannelHandler handler;

    private volatile URL url;

    // closing closed means the process is being closed and close is finished
    private volatile boolean closing;

    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        //赋值包装完的handler
        this.handler = handler;
      }

      @Override
       public void received(Channel ch, Object msg) throws RemotingException {
           if (closed) {
               return;
           }
           //收到消息后,也是直接转给了handler处理
           handler.received(ch, msg);
       }

把client的handler初始化逻辑说完了,再回到NettyHandler的receive()方法:

@Override
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
       NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
       try {
           handler.received(channel, e.getMessage());
       } finally {
           NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
       }
   }

可以看到在收到Netty的读事件后,是直接把请求又转发给了handler,也就是NettyClient的receive()方法去处理,也就是其父类的receive方法

到这一步等于收到的response被从底层被传递到了transport层和exchanger层。
因为AbstractPeer的handler实例是MultiMessageHandler,所以请求到了MultiMessageHandler,这个handler重写了receive方法,看样子是实现组合消息的解码,暂时不知道如何使用,这里直接跳过,进入到HeartbeatHanler .

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }

HeartbeatHanler extends AbstractChannelHandlerDelegate, 是判断收到的请求是否与心跳相关,如果是心跳request,则回复消息,如果是心跳response,则消息到此结束,不再回复.

@Override
   public void received(Channel channel, Object message) throws RemotingException {
       setReadTimestamp(channel);
       if (isHeartbeatRequest(message)) {
           Request req = (Request) message;
           if (req.isTwoWay()) {
             //需要回复心跳的情况,回复消息(注意要把消息id带上)
               Response res = new Response(req.getId(), req.getVersion());
               res.setEvent(Response.HEARTBEAT_EVENT);
               channel.send(res);
               if (logger.isInfoEnabled()) {
                   int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                   if (logger.isDebugEnabled()) {
                       logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                               + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                               + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                   }
               }
           }
           return;
       }
       if (isHeartbeatResponse(message)) {
           if (logger.isDebugEnabled()) {
               logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
           }
           return;
       }
       //不是心跳 最后接着转发给下一个handler
       handler.received(channel, message);

HeartbeatHanler的handler实例是AllChannelHandler,到这里就进入了dispatcher的逻辑了,前面已经介绍过了,Dispatcher由AllChannelHandler来实现,这个handler的作用就是同步转异步,把请求异步转发给下一个handler实例处理(ChannelEventRunnable内部实现就是转发handler事件),之前的几个handler是由Netty的线程执行在执行,到这一步Netty的线程就执行完毕进入空闲状态。到这一步也可以看到,收到response后,Netty的Channel是没有被关闭的,验证了之前长连接通信的猜想。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

AllChannelHandlerhandler实例是DecodeHandler extends AbstractChannelHandlerDelegate,AbstractChannelHandlerDelegate没有具体的代码实现,就是简单转发请求。
直接看DecodeHandler的实现,也是代理了handler请求,加入了解码功能,debug下了收到Response时解码情况,收到response后拿出DecodeableRpcResult进行解码,不过之前已经有解码过了,是在Netty的原生handler链里加入了Code2解码类进行解码,所以这里其实没有再解码,应该是一个扩展功能,另外考虑到这是Netty实现的上层,估计是一个整体的抽象,可能会用到解码代码.

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
          //取出DecodeableRpcResult 进行解码
            decode(((Response) message).getResult());
        }
        //请求再转发
        handler.received(channel, message);
    }

    private void decode(Object message) {
        if (message != null && message instanceof Decodeable) {
            try {
                ((Decodeable) message).decode();
                if (log.isDebugEnabled()) {
                    log.debug("Decode decodeable message " + message.getClass().getName());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decod
}

在上一篇文章DubboProtocol分析中已经知道,DecodeHandler的handler实例是HeaderExchangeHandler,再看HeaderExchangeHandler的实现,该类实现了Channel的所有事件处理,到这一步数据已经从Transport层流回流到了Exchange层。HeaderExchangeHandler是一个比较重要的类,把其中两个重要的方法贴出来:

public void received(Channel channel, Object message) throws RemotingException {
       channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
       ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
       try {
           if (message instanceof Request) {
               // handle request.
               Request request = (Request) message;
               //事件指的应该就是心跳
               if (request.isEvent()) {
                   handlerEvent(channel, request);
               } else {
                 //双向请求,指的就是需要回复的请求
                   if (request.isTwoWay()) {
                       Response response = handleRequest(exchangeChannel, request);
                       channel.send(response);
                   } else {
                       handler.received(exchangeChannel, request.getData());
                   }
               }
           } else if (message instanceof Response) {
             //处理收到的response
               handleResponse(channel, (Response) message);
           } else if (message instanceof String) {
               if (isClientSide(channel)) {
                   Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                   logger.error(e.getMessage(), e);
               } else {
                   String echo = handler.telnet(channel, (String) message);
                   if (echo != null && echo.length() > 0) {
                       channel.send(echo);
                   }
               }
           } else {
               handler.received(exchangeChannel, message);
           }
       } finally {
           HeaderExchangeChannel.removeChannelIfDisconnected(channel);
       }
   }
//处理收到的request请求
   Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
     //构造response对象,把request的id赋值上,对请求者来说,这是一个标识
       Response res = new Response(req.getId(), req.getVersion());
       //这个不知道什么情况会跳到这里
       if (req.isBroken()) {
           Object data = req.getData();

           String msg;
           if (data == null) msg = null;
           else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
           else msg = data.toString();
           res.setErrorMessage("Fail to decode request due to: " + msg);
           res.setStatus(Response.BAD_REQUEST);

           return res;
       }
       // find handler by message class.
       Object msg = req.getData();
       try {
           // handle data.
           //开始准备调用业务层的 @service方法,类似springMvc调用controller方法
           //这个handler的实例是DubboProtocol的一个内部类
           Object result = handler.reply(channel, msg);
           res.setStatus(Response.OK);
           res.setResult(result);
       } catch (Throwable e) {
           res.setStatus(Response.SERVICE_ERROR);
           res.setErrorMessage(StringUtils.toString(e));
       }
       return res;
   }

   static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

handleResponse()的代码逻辑要和HeaderExchangeChannel.receive()方法联系起来看,因为发送request和处理收到response是不同的线程处理的,这里也是想明白发送request之后,如何拿到response的关键。

下面单独看下DefaultFuture的received()方法:

public static void received(Channel channel, Response response) {
      try {
        //这里是根据消息ID来匹配收到的response属于哪个request
        //所以server端在回复消息时都需要带上request传递过去的id
          DefaultFuture future = FUTURES.remove(response.getId());
          if (future != null) {
              future.doReceived(response);
          } else {
              logger.warn("The timeout response finally returned at "
                      + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                      + ", response " + response
                      + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                      + " -> " + channel.getRemoteAddress()));
          }
      } finally {
          CHANNELS.remove(response.getId());
      }
  }
private void doReceived(Response res) {
      lock.lock();
      try {
        //赋值结果
          response = res;
          if (done != null) {
            //唤醒request等待线程,业务层执行的get()方法即可拿到结果,阻塞结束
              done.signal();
          }
      } finally {
          lock.unlock();
      }
      if (callback != null) {
          invokeCallback(callback);
      }
  }

  @Override
   public Object get(int timeout) throws RemotingException {
       if (timeout <= 0) {
           timeout = Constants.DEFAULT_TIMEOUT;
       }
       if (!isDone()) {
           long start = System.currentTimeMillis();
           lock.lock();
           try {
               while (!isDone()) {
                 //这里是一个while循环一直判断是否拿到结果,如果没有拿到结果就一直等待
                 //如上doReceived里所示,当有结果了这个线程会被唤醒
                   done.await(timeout, TimeUnit.MILLISECONDS);
                   if (isDone() || System.currentTimeMillis() - start > timeout) {
                       break;
                   }
               }
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           } finally {
               lock.unlock();
           }
           if (!isDone()) {
               throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
           }
       }
       return returnFromResponse();
   }

相关文章

网友评论

      本文标题:dubbo consumer端发送request与接收respo

      本文链接:https://www.haomeiwen.com/subject/jtsofctx.html