美文网首页Java 杂谈程序员MQ
RocketMQ源码:通信协议设计及编解码

RocketMQ源码:通信协议设计及编解码

作者: 金桔文案 | 来源:发表于2018-09-10 18:19 被阅读27次

本文主要分析RocketMQ通信协议的设计。RocketMQ设计了自己的一个通信协议,用于消息内容和二进制格式之间的转换。

RocketMQ的版本为:4.2.0 release。

一.通信协议的格式
MQ1.jpg

1.length:4字节整数,二三四部分长度总和;

2.header length:4字节整数,第三部分header data长度;

3.header data:存放Json序列化的数据;

4.body data:应用自定义二进制序列化的数据。

二.消息的编码过程

消息的编码是在 RemotingCommand 中 encode 方法中完成的:

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;
    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;
    // 3> body data length
    if (this.body != null) {
        length += body.length;
    }
    ByteBuffer result = ByteBuffer.allocate(4 + length);
    // 1.先放入消息的总大小
    result.putInt(length);
    // 2.再放入头部的长度
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    // 3.接着放入头部数据
    result.put(headerData);
    // 4.最后放入消息体的数据
    if (this.body != null) {
        result.put(this.body);
    }
    result.flip();
    return result;
}

对于头部数据 - header data 的编码,在 RemotingCommand 中 headerEncode 方法处理:

private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

还有一个细节的地方,保存 header length 的时候,经过了一个 markProtocolType 的处理,作用是将RPC类型和headerData长度编码放到一个byte[4]数组中:

    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];
        result[0] = type.getCode();
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

记住这个方法,后面从消息数据解码消息头长度的时候,就不会看的很迷茫。

三.消息的解码过程

消息的解码是在类 RemotingCommand 中 decode方法中完成的:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();// 获取byteBuffer的总长度
    int oriHeaderLen = byteBuffer.getInt();// 1.获取前4个字节,组装int类型,该长度为总长度 图中 length
    int headerLength = getHeaderLength(oriHeaderLen);// length & 0xFFFFFF 获取消息头的长度,与运算,编码时候的长度即为24位
    byte[] headerData = new byte[headerLength];// 保存header data
    byteBuffer.get(headerData);// 2.从缓冲区中读取headerLength个字节的数据,这个数据就是报文头部的数据
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
    int bodyLength = length - 4 - headerLength;// 报文体的数据,减去了第二、三部分的长度
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);// 获取消息体的数据
    }
    cmd.body = bodyData;
    return cmd;
}

对于头部数据 - header data 的解码,在 RemotingCommand 中 headerDecode 方法处理:

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }
    return null;
}

解码出 header length - 消息头长度,在 getHeaderLength 中:

public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;// 为什么是和高位的24位与,可以参考编码时的方法:markProtocolType
}

通过代码分析,发现RocketMQ的协议设计不是很复杂,只要我们耐心一步步跟进去,看到最里面的代码。"哦,原来是这样!"

文章来源:https://my.oschina.net/javamaster/blog/2001963

推荐阅读:https://www.roncoo.com/article/index?title=MQ

相关文章

网友评论

    本文标题:RocketMQ源码:通信协议设计及编解码

    本文链接:https://www.haomeiwen.com/subject/dzfegftx.html