美文网首页
dubbo consumer端发送request与接收respo

dubbo consumer端发送request与接收respo

作者: 雕兄L | 来源:发表于2019-06-23 11:43 被阅读0次
    本篇内容主要分析client端发送的request和接收的response是如何在Exchanger层和Transport层流转。
    • 如何发送request请求

    在执行request时,是从DubboProtocol的refer()方法里拿到一个DubboInvoker实例,然后request就从protocol层进入exchanger层。DubboInvoker里的client其实是一个被包装了很多层的对象。首先从ReferenceCountExchangeClient说起,ReferenceCountExchangeClient拿到请求后把请求直接转发给了HeaderExchangeClient

    @Override
       protected Result doInvoke(final Invocation invocation) throws Throwable {
           RpcInvocation inv = (RpcInvocation) invocation;
           final String methodName = RpcUtils.getMethodName(invocation);
           inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
           inv.setAttachment(Constants.VERSION_KEY, version);
    
           ExchangeClient currentClient;
           if (clients.length == 1) {
               currentClient = clients[0];
           } else {
               currentClient = clients[index.getAndIncrement() % clients.length];
           }
           try {
               boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
               boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
               int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
               if (isOneway) {
                   boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                   currentClient.send(inv, isSent);
                   RpcContext.getContext().setFuture(null);
                   return new RpcResult();
               } else if (isAsync) {
                   ResponseFuture future = currentClient.request(inv, timeout);
                   RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                   return new RpcResult();
               } else {
                   RpcContext.getContext().setFuture(null);
                   //client开始执行的地方
                   return (Result) currentClient.request(inv, timeout).get();
               }
           } catch (TimeoutException e) {
               throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
           } catch (RemotingException e) {
               throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
           }
       }
    

    HeaderExchangeClient是把请求转发给了channel,通过构造方法可以看到,其channel实例是exchanger层的HeaderExchangeChannel

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
         if (client == null) {
             throw new IllegalArgumentException("client == null");
         }
         this.client = client;
         //channel实例化 注意client被传递给了HeaderExchangeChannel
         this.channel = new HeaderExchangeChannel(client);
         String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
         this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
         this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
         if (heartbeatTimeout < heartbeat * 2) {
             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
         }
         if (needHeartbeat) {
             startHeartbeatTimer();
         }
     }
    

    再来到HeaderExchangeChannel的request方法,发现是又转给了channel.send(),那么我们可以推测,这个channel应该就是transport层的channel了,其实这channel就是之前提到的NettyClient,这里request从exchange层流转到了transport层

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    

    NettyClient继承的类有点多,就不一一列列举了,具体就是从父类AbstractPeer的send方法一直执行到AbstractClient的send方法,AbstractClient的send方法如下:

    public void send(Object message, boolean sent) throws RemotingException {
         if (send_reconnect && !isConnected()) {
             connect();
         }
         //注意这里有一个getChannel方法,是子类需要实现的方法,
         //咋们看下NettyClient的实现就知道什么意思了
         Channel channel = getChannel();
         //TODO Can the value returned by getChannel() be null? need improvement.
         if (channel == null || !channel.isConnected()) {
             throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
         }
         channel.send(message, sent);
     }
    

    NettyClient的getChannel()实现可以看到,是直接把NettyClient的channel成员变量包装了下返回了,这个channel就是Netty的原生channel对象。

    @Override
    protected com.alibaba.dubbo.remoing.Channel getChannel() {
      //这里把connect()方法建立的channel对象赋值过来(SocketChannel)
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
    

    感觉挺绕的,到了这里又多出来个NettyChannel,NettyChannel的getOrAddChannel如下代码所示,是用NettyChannel对象包装了Netty原生channel,然后以Netty原生的channel对象为key,存起来了。NettyChannel对应原生的Channel对象,一个client与server之间拥有一个NettyChannel对象,从这里可以先预判,各个client与server之间交互是保持长连接的。这个NettyChannel实现了dubbo的channel接口,主要实现了写事件。

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
         if (ch == null) {
             return null;
         }
         NettyChannel ret = channelMap.get(ch);
         if (ret == null) {
             NettyChannel nc = new NettyChannel(ch, url, handler);
             if (ch.isConnected()) {
                 ret = channelMap.putIfAbsent(ch, nc);
             }
             if (ret == null) {
                 ret = nc;
             }
         }
         return ret;
     }
     public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
          //原生的Netty写事件,把数据写入channel,即开始向远端服务发送请求
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
    
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    

    回到AbstractClient的send方法里,拿到channel之后,又执行了channel的send方法,其实就是执行了NettyChannel的send方法,看上面的代码可以看到,是调用了Netty原生的channel把数据写出去了.

    到这里,request就已经发送出去了。感觉dubbo发送个请求真的是太绕了,绕晕了有没有.


    • 如何处理返回的Response结果

    发送完request之后还得处理返回的response,response的处理应该由handler来完成,即应该由底层io捕捉读事件,把收到的response传到transport层。NettyHanler是处理Netty读写事件的handler,在NettyClient里初始化NettyHandler拦截器时,把自己传递给了NettyHandler。所以NettyClient本身也充当了一个handler角色,是一个包装了其他handler的handler。
    回过头再来看下NettyClient的初始化化逻辑,构造方法里有一段逻辑调用了父类的wrapChannelHandler()方法,通过方法名可以看出,这是在初始化父类的handler实例,并且这个handler被包装了。

    public class NettyClient extends AbstractClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        // ChannelFactory's closure has a DirectMemory leak, using static to avoid
        // https://issues.jboss.org/browse/NETTY-424
        private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
                Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
                Constants.DEFAULT_IO_THREADS);
        private ClientBootstrap bootstrap;
    
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
          //执行handler包装初始化逻辑
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            bootstrap = new ClientBootstrap(channelFactory);
            // config
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
            bootstrap.setOption("keepAlive", true);
            bootstrap.setOption("tcpNoDelay", true);
            bootstrap.setOption("connectTimeoutMillis", getTimeout());
            //初始化NettyHandler时,把这个NettyClient实例传递了过去
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    //把NettyHandler加入拦截链,由这个handler处理netty的各种读写事件
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
        }
    
        ....省略掉部分代码
        @Override
        protected org.apache.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isConnected())
                return null;
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    }
    

    进入wrapChannelHandler()方法,最后看到的是这样一段逻辑:

    public class ChannelHandlers {
    
        private static ChannelHandlers INSTANCE = new ChannelHandlers();
    
        protected ChannelHandlers() {
        }
    
        public static ChannelHandler wrap(ChannelHandler handler, URL url) {
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }
    
        protected static ChannelHandlers getInstance() {
            return INSTANCE;
        }
    
        static void setTestingChannelHandlers(ChannelHandlers instance) {
            INSTANCE = instance;
        }
    
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
          //初始化代码
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
          }
        }
    

    那么现在的情况是NettyClient包装了MultiMessageHandler,MultiMessageHandler包装了HeartbeatHandler,HeartbeatHandler又把请求代理给了Dispatcher,Dispatcher大概有AllDispatcher,DirectDispatcher,ExecutionDispatcher,MessageOnlyDispatcher 几个实现,默认使用的是AllDispatcher,即把请求代理给了AllChannelHandler,回头再来看AllChannelHandler的实现细节。

    再看下NettyClient的继承关系:NettyClient extend AbstractClient extend AbstractEndpoint extend AbstractPeer,通过查看多个父类,可以看到是AbstractPeer的初始逻辑

    public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    
        private final ChannelHandler handler;
    
        private volatile URL url;
    
        // closing closed means the process is being closed and close is finished
        private volatile boolean closing;
    
        private volatile boolean closed;
    
        public AbstractPeer(URL url, ChannelHandler handler) {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            this.url = url;
            //赋值包装完的handler
            this.handler = handler;
          }
    
          @Override
           public void received(Channel ch, Object msg) throws RemotingException {
               if (closed) {
                   return;
               }
               //收到消息后,也是直接转给了handler处理
               handler.received(ch, msg);
           }
    

    把client的handler初始化逻辑说完了,再回到NettyHandler的receive()方法:

    @Override
       public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
           NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
           try {
               handler.received(channel, e.getMessage());
           } finally {
               NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
           }
       }
    

    可以看到在收到Netty的读事件后,是直接把请求又转发给了handler,也就是NettyClient的receive()方法去处理,也就是其父类的receive方法

    到这一步等于收到的response被从底层被传递到了transport层和exchanger层。
    因为AbstractPeer的handler实例是MultiMessageHandler,所以请求到了MultiMessageHandler,这个handler重写了receive方法,看样子是实现组合消息的解码,暂时不知道如何使用,这里直接跳过,进入到HeartbeatHanler .

    @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage) message;
                for (Object obj : list) {
                    handler.received(channel, obj);
                }
            } else {
                handler.received(channel, message);
            }
        }
    

    HeartbeatHanler extends AbstractChannelHandlerDelegate, 是判断收到的请求是否与心跳相关,如果是心跳request,则回复消息,如果是心跳response,则消息到此结束,不再回复.

    @Override
       public void received(Channel channel, Object message) throws RemotingException {
           setReadTimestamp(channel);
           if (isHeartbeatRequest(message)) {
               Request req = (Request) message;
               if (req.isTwoWay()) {
                 //需要回复心跳的情况,回复消息(注意要把消息id带上)
                   Response res = new Response(req.getId(), req.getVersion());
                   res.setEvent(Response.HEARTBEAT_EVENT);
                   channel.send(res);
                   if (logger.isInfoEnabled()) {
                       int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                       if (logger.isDebugEnabled()) {
                           logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                   + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                   + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                       }
                   }
               }
               return;
           }
           if (isHeartbeatResponse(message)) {
               if (logger.isDebugEnabled()) {
                   logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
               }
               return;
           }
           //不是心跳 最后接着转发给下一个handler
           handler.received(channel, message);
    

    HeartbeatHanler的handler实例是AllChannelHandler,到这里就进入了dispatcher的逻辑了,前面已经介绍过了,Dispatcher由AllChannelHandler来实现,这个handler的作用就是同步转异步,把请求异步转发给下一个handler实例处理(ChannelEventRunnable内部实现就是转发handler事件),之前的几个handler是由Netty的线程执行在执行,到这一步Netty的线程就执行完毕进入空闲状态。到这一步也可以看到,收到response后,Netty的Channel是没有被关闭的,验证了之前长连接通信的猜想。

    public class AllChannelHandler extends WrappedChannelHandler {
    
        public AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    
        @Override
        public void connected(Channel channel) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
            }
        }
    
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
            }
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
                //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    
        @Override
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
            } catch (Throwable t) {
                throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
            }
        }
    }
    

    AllChannelHandlerhandler实例是DecodeHandler extends AbstractChannelHandlerDelegate,AbstractChannelHandlerDelegate没有具体的代码实现,就是简单转发请求。
    直接看DecodeHandler的实现,也是代理了handler请求,加入了解码功能,debug下了收到Response时解码情况,收到response后拿出DecodeableRpcResult进行解码,不过之前已经有解码过了,是在Netty的原生handler链里加入了Code2解码类进行解码,所以这里其实没有再解码,应该是一个扩展功能,另外考虑到这是Netty实现的上层,估计是一个整体的抽象,可能会用到解码代码.

    public class DecodeHandler extends AbstractChannelHandlerDelegate {
    
        private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
    
        public DecodeHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
              //取出DecodeableRpcResult 进行解码
                decode(((Response) message).getResult());
            }
            //请求再转发
            handler.received(channel, message);
        }
    
        private void decode(Object message) {
            if (message != null && message instanceof Decodeable) {
                try {
                    ((Decodeable) message).decode();
                    if (log.isDebugEnabled()) {
                        log.debug("Decode decodeable message " + message.getClass().getName());
                    }
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                    }
                } // ~ end of catch
            } // ~ end of if
        } // ~ end of method decod
    }
    

    在上一篇文章DubboProtocol分析中已经知道,DecodeHandler的handler实例是HeaderExchangeHandler,再看HeaderExchangeHandler的实现,该类实现了Channel的所有事件处理,到这一步数据已经从Transport层流回流到了Exchange层。HeaderExchangeHandler是一个比较重要的类,把其中两个重要的方法贴出来:

    public void received(Channel channel, Object message) throws RemotingException {
           channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
           ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
           try {
               if (message instanceof Request) {
                   // handle request.
                   Request request = (Request) message;
                   //事件指的应该就是心跳
                   if (request.isEvent()) {
                       handlerEvent(channel, request);
                   } else {
                     //双向请求,指的就是需要回复的请求
                       if (request.isTwoWay()) {
                           Response response = handleRequest(exchangeChannel, request);
                           channel.send(response);
                       } else {
                           handler.received(exchangeChannel, request.getData());
                       }
                   }
               } else if (message instanceof Response) {
                 //处理收到的response
                   handleResponse(channel, (Response) message);
               } else if (message instanceof String) {
                   if (isClientSide(channel)) {
                       Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                       logger.error(e.getMessage(), e);
                   } else {
                       String echo = handler.telnet(channel, (String) message);
                       if (echo != null && echo.length() > 0) {
                           channel.send(echo);
                       }
                   }
               } else {
                   handler.received(exchangeChannel, message);
               }
           } finally {
               HeaderExchangeChannel.removeChannelIfDisconnected(channel);
           }
       }
    //处理收到的request请求
       Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
         //构造response对象,把request的id赋值上,对请求者来说,这是一个标识
           Response res = new Response(req.getId(), req.getVersion());
           //这个不知道什么情况会跳到这里
           if (req.isBroken()) {
               Object data = req.getData();
    
               String msg;
               if (data == null) msg = null;
               else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
               else msg = data.toString();
               res.setErrorMessage("Fail to decode request due to: " + msg);
               res.setStatus(Response.BAD_REQUEST);
    
               return res;
           }
           // find handler by message class.
           Object msg = req.getData();
           try {
               // handle data.
               //开始准备调用业务层的 @service方法,类似springMvc调用controller方法
               //这个handler的实例是DubboProtocol的一个内部类
               Object result = handler.reply(channel, msg);
               res.setStatus(Response.OK);
               res.setResult(result);
           } catch (Throwable e) {
               res.setStatus(Response.SERVICE_ERROR);
               res.setErrorMessage(StringUtils.toString(e));
           }
           return res;
       }
    
       static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    
    

    handleResponse()的代码逻辑要和HeaderExchangeChannel.receive()方法联系起来看,因为发送request和处理收到response是不同的线程处理的,这里也是想明白发送request之后,如何拿到response的关键。

    下面单独看下DefaultFuture的received()方法:

    public static void received(Channel channel, Response response) {
          try {
            //这里是根据消息ID来匹配收到的response属于哪个request
            //所以server端在回复消息时都需要带上request传递过去的id
              DefaultFuture future = FUTURES.remove(response.getId());
              if (future != null) {
                  future.doReceived(response);
              } else {
                  logger.warn("The timeout response finally returned at "
                          + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                          + ", response " + response
                          + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                          + " -> " + channel.getRemoteAddress()));
              }
          } finally {
              CHANNELS.remove(response.getId());
          }
      }
    private void doReceived(Response res) {
          lock.lock();
          try {
            //赋值结果
              response = res;
              if (done != null) {
                //唤醒request等待线程,业务层执行的get()方法即可拿到结果,阻塞结束
                  done.signal();
              }
          } finally {
              lock.unlock();
          }
          if (callback != null) {
              invokeCallback(callback);
          }
      }
    
      @Override
       public Object get(int timeout) throws RemotingException {
           if (timeout <= 0) {
               timeout = Constants.DEFAULT_TIMEOUT;
           }
           if (!isDone()) {
               long start = System.currentTimeMillis();
               lock.lock();
               try {
                   while (!isDone()) {
                     //这里是一个while循环一直判断是否拿到结果,如果没有拿到结果就一直等待
                     //如上doReceived里所示,当有结果了这个线程会被唤醒
                       done.await(timeout, TimeUnit.MILLISECONDS);
                       if (isDone() || System.currentTimeMillis() - start > timeout) {
                           break;
                       }
                   }
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               } finally {
                   lock.unlock();
               }
               if (!isDone()) {
                   throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
               }
           }
           return returnFromResponse();
       }
    

    相关文章

      网友评论

          本文标题:dubbo consumer端发送request与接收respo

          本文链接:https://www.haomeiwen.com/subject/jtsofctx.html