美文网首页
Dubbo Client NIO处理

Dubbo Client NIO处理

作者: huiwq1990 | 来源:发表于2017-06-28 20:04 被阅读0次

    请求开始

    主要是区分是不是异步

    image.png

    com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

     boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
    //构造Future对象,并且放入到线程变量
                    ResponseFuture future = currentClient.request(inv, timeout) ;
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
    //直接调用get,返回结果
                    return (Result) currentClient.request(inv, timeout).get();
                }
    

    网络请求构造及创建Future

    返回上一步需要的Future对象。

    image.png

    com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

            // create request.在构造函数里设置ID
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try{
                channel.send(req);
            }catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
    

    执行网络请求

    真正执行数据发送,到这里发送阶段就全部完成。
    com.alibaba.dubbo.remoting.transport.netty.NettyChannel#send

    image.png
     public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
            
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.write(message);
                if (sent) {
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.getCause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
            }
            if(! success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
    

    接收数据解码

    接收数据最先进行解码工作。

    image.png

    请求ID的解码,返回结果解码
    com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

     protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
            byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
            Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
            // get request id.
    // 从头中获取请求ID
            long id = Bytes.bytes2long(header, 4);
            if ((flag & FLAG_REQUEST) == 0) {
                // decode response.
                Response res = new Response(id);
                if ((flag & FLAG_EVENT) != 0) {
                    res.setEvent(Response.HEARTBEAT_EVENT);
                }
                // get status.
                byte status = header[3];
                res.setStatus(status);
                if (status == Response.OK) {
                    try {
                        Object data;
                        if (res.isHeartbeat()) {
                            data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                        } else if (res.isEvent()) {
                            data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                        } else {
                            DecodeableRpcResult result;
    //解码工作是否在IO线程处理,默认是true
                            if (channel.getUrl().getParameter(
                                    Constants.DECODE_IN_IO_THREAD_KEY,
                                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                                result = new DecodeableRpcResult(channel, res, is,
                                        (Invocation)getRequestData(id), proto);
    //执行解码
                                result.decode();
                            } else {
                                result = new DecodeableRpcResult(channel, res,
                                        new UnsafeByteArrayInputStream(readMessageData(is)),
                                        (Invocation) getRequestData(id), proto);
                            }
                            data = result;
                        }
    //设置返回结果
                        res.setResult(data);
                    } catch (Throwable t) {
                        res.setStatus(Response.CLIENT_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                } else {
                    res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
                }
                return res;
    

    返回Message处理

    如果是区分heartbeat与业务请求,将业务处理放到线程池执行。

    image.png

    断点:com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

    public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    

    线程池中执行 com.alibaba.dubbo.remoting.transport.DecodeHandler#decode

    处理回调函数

    断点:com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#doReceived

    image.png

    线程池运行的任务com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run

    com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received

        public static void received(Channel channel, Response response) {
            try {
    //根据ID获取请求时放的Future对象
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    future.doReceived(response);
                } else {  
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    

    参考

    http://flychao88.iteye.com/blog/2190465
    http://blog.csdn.net/qq418517226/article/details/51906357

    相关文章

      网友评论

          本文标题:Dubbo Client NIO处理

          本文链接:https://www.haomeiwen.com/subject/txaucxtx.html