美文网首页dubbo
Dubbo Consumer 编解码过程

Dubbo Consumer 编解码过程

作者: 晴天哥_王志 | 来源:发表于2020-01-20 16:23 被阅读0次

    Dubbo 协议

    协议概览

    Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。


    /dev-guide/images/dubbo_protocol_header.jpg

    协议详情

    • Magic - Magic High & Magic Low (16 bits)

      标识协议版本号,Dubbo 协议:0xdabb

    • Req/Res (1 bit)

      标识是请求或响应。请求: 1; 响应: 0。

    • 2 Way (1 bit)

      仅在 Req/Res 为1(请求)时才有用,标记是否期望从服务器返回值。如果需要来自服务器的返回值,则设置为1。

    • Event (1 bit)

      标识是否是事件消息,例如,心跳事件。如果这是一个事件,则设置为1。

    • Serialization ID (5 bit)

      标识序列化类型:比如 fastjson 的值为6。

    • Status (8 bits)

      仅在 Req/Res 为0(响应)时有用,用于标识响应的状态。

      • 20 - OK
      • 30 - CLIENT_TIMEOUT
      • 31 - SERVER_TIMEOUT
      • 40 - BAD_REQUEST
      • 50 - BAD_RESPONSE
      • 60 - SERVICE_NOT_FOUND
      • 70 - SERVICE_ERROR
      • 80 - SERVER_ERROR
      • 90 - CLIENT_ERROR
      • 100 - SERVER_THREADPOOL_EXHAUSTED_ERROR
    • Request ID (64 bits)

      标识唯一请求。类型为long。

    • Data Length (32 bits)

      序列化后的内容长度(可变部分),按字节计数。int类型。

    • Variable Part

      被特定的序列化类型(由序列化 ID 标识)序列化后,每个部分都是一个 byte [] 或者 byte

      • 如果是请求包 ( Req/Res = 1),则每个部分依次为:
        • Dubbo version
        • Service name
        • Service version
        • Method name
        • Method parameter types
        • Method arguments
        • Attachments
      • 如果是响应包(Req/Res = 0),则每个部分依次为:
        • 返回值类型(byte),标识从服务器端返回的值类型:
          • 返回空值:RESPONSE_NULL_VALUE 2
          • 正常响应值: RESPONSE_VALUE 1
          • 异常:RESPONSE_WITH_EXCEPTION 0
        • 返回值:从服务端返回的响应bytes

    源码分析

    DubboCodec类图

    • DubboCodec的类关系如上图,核心的逻辑在ExchangeCodec和DubboCodec。

    NettyClient

    public class NettyClient extends AbstractClient {
    
        private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
        private Bootstrap bootstrap;
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(NioSocketChannel.class);
    
            if (getConnectTimeout() < 3000) {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            } else {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
            }
    
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            // NettyCodecAdapter.getEncoder()返回InternalEncoder对象
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyClientHandler);
                }
            });
        }
    }
    
    • NettyClient的NettyCodecAdapter对象,在编码过程中会使用adapter.getEncoder(),在解码过程中会使用adapter.getDecoder()。
    • 核心继续关注下NettyCodecAdapter。

    NettyCodecAdapter

    final class NettyCodecAdapter {
    
        private final ChannelHandler encoder = new InternalEncoder();
        private final ChannelHandler decoder = new InternalDecoder();
        // DubboCountCodec对象
        private final Codec2 codec;
        private final URL url;
        private final com.alibaba.dubbo.remoting.ChannelHandler handler;
    
        public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
            this.codec = codec;
            this.url = url;
            this.handler = handler;
        }
    
        public ChannelHandler getEncoder() {
            return encoder;
        }
    
        public ChannelHandler getDecoder() {
            return decoder;
        }
    
        private class InternalEncoder extends MessageToByteEncoder {
    
            @Override
            protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
                com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
                Channel ch = ctx.channel();
                NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
                try {
                    // DubboCountCodec对象
                    codec.encode(channel, buffer, msg);
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ch);
                }
            }
        }
    
        private class InternalDecoder extends ByteToMessageDecoder {
    
            @Override
            protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    
                ChannelBuffer message = new NettyBackedChannelBuffer(input);
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
                Object msg;
                int saveReaderIndex;
    
                try {
                    // decode object.
                    do {
                        saveReaderIndex = message.readerIndex();
                        try {
                            // DubboCountCodec对象
                            msg = codec.decode(channel, message);
                        } catch (IOException e) {
                            throw e;
                        }
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            //is it possible to go here ?
                            if (saveReaderIndex == message.readerIndex()) {
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                out.add(msg);
                            }
                        }
                    } while (message.readable());
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ctx.channel());
                }
            }
        }
    }
    
    • NettyCodecAdapter包含编码对象InternalEncoder。
    • NettyCodecAdapter包含解码对象InternalDecoder。
    • NettyCodecAdapter的codec对象为DubboCountCodec。

    Consumer编码过程

    • Dubbo Client Consumer请求编码调用栈
      => NettyCodecAdapter.InternalEncoder.encode
      => DubboCountCodec.encode
      => ExchangeCodec.encode
      => ExchangeCodec.encodeRequest
      => DubboCodec.encodeRequestData

    DubboCountCodec

    dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
    
    public final class DubboCountCodec implements Codec2 {
    
        private DubboCodec codec = new DubboCodec();
    
        @Override
        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            // DubboCodec对象
            codec.encode(channel, buffer, msg);
        }
    }
    
    • DubboCountCodec调用DubboCodec的encode()方法进行编码。
    • DubboCodec的encode()调用的父类ExchangeCodec的encode()方法进行编码。

    ExchangeCodec

    public class ExchangeCodec extends TelnetCodec {
    
        // header length.
        protected static final int HEADER_LENGTH = 16;
        // magic header.
        protected static final short MAGIC = (short) 0xdabb;
        protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
        protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
        // message flag.
        protected static final byte FLAG_REQUEST = (byte) 0x80;
        protected static final byte FLAG_TWOWAY = (byte) 0x40;
        protected static final byte FLAG_EVENT = (byte) 0x20;
        protected static final int SERIALIZATION_MASK = 0x1f;
    
        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            if (msg instanceof Request) {
                encodeRequest(channel, buffer, (Request) msg);
            } else if (msg instanceof Response) {
                encodeResponse(channel, buffer, (Response) msg);
            } else {
                super.encode(channel, buffer, msg);
            }
        }
    
        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
            // 获取序列化的对象
            // hessian2=com.alibaba.dubbo.common.serialize.hessian2.Hessian2Serialization
            Serialization serialization = getSerialization(channel);
            // header. HEADER_LENGTH = 16;
            // 头部的长度为16个字节
            byte[] header = new byte[HEADER_LENGTH];
    
            // 第1-2个字节:一个魔数数字(就是一个固定的数字)
            Bytes.short2bytes(MAGIC, header);
    
            // 第3个字节:双向或单向的标记。
            // 双向是指请求有去有回,有返回值。单向是指请求有去无回,没有返回值。
            header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
            if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
            if (req.isEvent()) header[2] |= FLAG_EVENT;
    
            // 第4个字节:在request请求中空着
            // 第5-12个字节:请求id,long型的8个字节。
            // 异步变同步的全局唯一ID,用来做Consumer和Provider的来回通信标记。
            Bytes.long2bytes(req.getId(), header, 4);
    
            // 编码请求数据
            int savedWriteIndex = buffer.writerIndex();
            // 从HEADER_LENGTH第17个字节的位置开始写请求数据
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            if (req.isEvent()) {
                encodeEventData(channel, out, req.getData());
            } else {
                encodeRequestData(channel, out, req.getData(), req.getVersion());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();
            int len = bos.writtenBytes();
            // 检查请求大小是否超过限制,最大8M
            checkPayload(channel, len);
    
            // 第13-16个字节:消息体长度,请求数据的长度。
            Bytes.int2bytes(len, header, 12);
    
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // 写入头部数据
            // 设置buffer的下一次位置,跳过头部数据和实体数据
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        }
    }
    
    • ExchangeCodec的encode()实际执行的是encodeRequest()方法。

    DubboCodec

    public class DubboCodec extends ExchangeCodec implements Codec2 {
    
        @Override
        protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
            RpcInvocation inv = (RpcInvocation) data;
            // 版本号
            out.writeUTF(version);
            // 服务路径
            out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    
            out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
            // 方法名
            out.writeUTF(inv.getMethodName());
            // 方法参数类型
            out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
            // 方法参数值
            Object[] args = inv.getArguments();
            if (args != null)
                for (int i = 0; i < args.length; i++) {
                    out.writeObject(encodeInvocationArgument(channel, inv, i));
                }
            // 方法透传数据
            out.writeObject(inv.getAttachments());
        }
    }
    
    • 在对请求进行编码时,会把版本号、服务路径、方法名、方法参数等信息都进行编码,其中参数类型是用JVM中的类型表示方法来编码的。

    Consumer解码过程

    • Dubbo Client Consumer响应解码调用栈
      => NettyCodecAdapter.InternalDecoder. decode
      =>DubboCountCodec.decode
      =>ExchangeCodec.decode
      =>DubboCodec.decodeBody
      =>DecodeableRPCResult.decode

    DubboCountCodec

    dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
    
    public final class DubboCountCodec implements Codec2 {
    
        private DubboCodec codec = new DubboCodec();
    
        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int save = buffer.readerIndex();
            MultiMessage result = MultiMessage.create();
            do {
                // DubboCodec对象
                Object obj = codec.decode(channel, buffer);
                if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                    buffer.readerIndex(save);
                    break;
                } else {
                    result.addMessage(obj);
                    logMessageLength(obj, buffer.readerIndex() - save);
                    save = buffer.readerIndex();
                }
            } while (true);
            if (result.isEmpty()) {
                return Codec2.DecodeResult.NEED_MORE_INPUT;
            }
            if (result.size() == 1) {
                return result.get(0);
            }
            return result;
        }
    }
    
    • DubboCountCodec.decode()执行DubboCodec.decode()解码。
    • DubboCodec.decode()执行父类的ExchangeCodec.decode()。

    ExchangeCodec

    public class ExchangeCodec extends TelnetCodec {
    
        // header length.
        protected static final int HEADER_LENGTH = 16;
        // magic header.
        protected static final short MAGIC = (short) 0xdabb;
        protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
        protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
        // message flag.
        protected static final byte FLAG_REQUEST = (byte) 0x80;
        protected static final byte FLAG_TWOWAY = (byte) 0x40;
        protected static final byte FLAG_EVENT = (byte) 0x20;
        protected static final int SERIALIZATION_MASK = 0x1f;
    
        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int readable = buffer.readableBytes();
            byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
            buffer.readBytes(header);
            return decode(channel, buffer, readable, header);
        }
    
        @Override
        protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
            // 检查魔数,如果不是dubbo协议,则使用父类的解码方法
            if (readable > 0 && header[0] != MAGIC_HIGH
                    || readable > 1 && header[1] != MAGIC_LOW) {
                int length = header.length;
                if (header.length < readable) {
                    header = Bytes.copyOf(header, readable);
                    buffer.readBytes(header, length, readable - length);
                }
                for (int i = 1; i < header.length - 1; i++) {
                    if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                        buffer.readerIndex(buffer.readerIndex() - header.length + i);
                        header = Bytes.copyOf(header, i);
                        break;
                    }
                }
                return super.decode(channel, buffer, readable, header);
            }
    
            // dubbo 协议解析
            if (readable < HEADER_LENGTH) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // 获取数据的实际长度
            int len = Bytes.bytes2int(header, 12);
            checkPayload(channel, len);
    
            // 实际数据包长度 = 头部长度 + 数据长度
            int tt = len + HEADER_LENGTH;
            if (readable < tt) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // limit input stream.
            ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    
            try {
                return decodeBody(channel, is, header);
            } finally {
               // 省略相关代码
            }
        }
    }
    

    DubboCodec

    public class DubboCodec extends ExchangeCodec implements Codec2 {
    
        @Override
        protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
            // 读取消息标记为和协议类型
            byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
            // 读取请求id
            long id = Bytes.bytes2long(header, 4);
            if ((flag & FLAG_REQUEST) == 0) {
                // 客户端对响应进行解码
                Response res = new Response(id);
                if ((flag & FLAG_EVENT) != 0) {
                    res.setEvent(Response.HEARTBEAT_EVENT);
                }
                // 读取响应状态
                byte status = header[3];
                res.setStatus(status);
                try {
                    if (status == Response.OK) {
                        Object data;
                        if (res.isHeartbeat()) {
                            data = decodeHeartbeatData(channel,  CodecSupport.deserialize(channel.getUrl(), is, proto));
                        } else if (res.isEvent()) {
                            data = decodeEventData(channel,  CodecSupport.deserialize(channel.getUrl(), is, proto));
                        } else {
                            DecodeableRpcResult result;
                            // 根据decode.in.io的配置决定何时进行解码
                            if (channel.getUrl().getParameter(
                                    Constants.DECODE_IN_IO_THREAD_KEY,
                                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                                result = new DecodeableRpcResult(channel, res, is,
                                        (Invocation) getRequestData(id), proto);
                                // 执行解码
                                result.decode();
                            } else {
                                // 解码相应的响应值
                                result = new DecodeableRpcResult(channel, res,
                                        new UnsafeByteArrayInputStream(readMessageData(is)),
                                        (Invocation) getRequestData(id), proto);
                            }
                            data = result;
                        }
                        res.setResult(data);
                    } else {
                        res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
                    }
                } catch (Throwable t) {
                    // 省略其他的代码
                return res;
            } else {
                // decode request.
                // 省略其他的代码
            }
        }
    }
    

    DecodeableRpcResult

    public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
    
        private Channel channel;
        private byte serializationType;
        private InputStream inputStream;
        private Response response;
        private Invocation invocation;
        private volatile boolean hasDecoded;
    
        public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
            Assert.notNull(channel, "channel == null");
            Assert.notNull(response, "response == null");
            Assert.notNull(is, "inputStream == null");
            this.channel = channel;
            this.response = response;
            this.inputStream = is;
            this.invocation = invocation;
            this.serializationType = id;
        }
    
        @Override
        public Object decode(Channel channel, InputStream input) throws IOException {
            // 获取序列化对象
            ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                    .deserialize(channel.getUrl(), input);
            
            byte flag = in.readByte();
            switch (flag) {
                case DubboCodec.RESPONSE_NULL_VALUE:
                    break;
                case DubboCodec.RESPONSE_VALUE:
                    try {
                        Type[] returnType = RpcUtils.getReturnTypes(invocation);
                        setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                                (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                        : in.readObject((Class<?>) returnType[0], returnType[1])));
                    } catch (ClassNotFoundException e) {
                        throw new IOException(StringUtils.toString("Read response data failed.", e));
                    }
                    break;
                case DubboCodec.RESPONSE_WITH_EXCEPTION:
                    try {
                        Object obj = in.readObject();
                        if (obj instanceof Throwable == false)
                            throw new IOException("Response data error, expect Throwable, but get " + obj);
                        setException((Throwable) obj);
                    } catch (ClassNotFoundException e) {
                        throw new IOException(StringUtils.toString("Read response data failed.", e));
                    }
                    break;
                case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                    try {
                        setAttachments((Map<String, String>) in.readObject(Map.class));
                    } catch (ClassNotFoundException e) {
                        throw new IOException(StringUtils.toString("Read response data failed.", e));
                    }
                    break;
                case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                    try {
                        Type[] returnType = RpcUtils.getReturnTypes(invocation);
                        setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                                (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                        : in.readObject((Class<?>) returnType[0], returnType[1])));
                        setAttachments((Map<String, String>) in.readObject(Map.class));
                    } catch (ClassNotFoundException e) {
                        throw new IOException(StringUtils.toString("Read response data failed.", e));
                    }
                    break;
                case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                    try {
                        Object obj = in.readObject();
                        if (obj instanceof Throwable == false)
                            throw new IOException("Response data error, expect Throwable, but get " + obj);
                        setException((Throwable) obj);
                        setAttachments((Map<String, String>) in.readObject(Map.class));
                    } catch (ClassNotFoundException e) {
                        throw new IOException(StringUtils.toString("Read response data failed.", e));
                    }
                    break;
                default:
                    throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', get " + flag);
            }
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
            return this;
        }
    
        @Override
        public void decode() throws Exception {
            if (!hasDecoded && channel != null && inputStream != null) {
                try {
                    decode(channel, inputStream);
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode rpc result failed: " + e.getMessage(), e);
                    }
                    response.setStatus(Response.CLIENT_ERROR);
                    response.setErrorMessage(StringUtils.toString(e));
                } finally {
                    hasDecoded = true;
                }
            }
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Dubbo Consumer 编解码过程

        本文链接:https://www.haomeiwen.com/subject/simbactx.html