美文网首页rocketMq
rocketmq rpc 通信(一) 编解码

rocketmq rpc 通信(一) 编解码

作者: 左小星 | 来源:发表于2019-08-03 12:52 被阅读0次

  消息中间件的性能吞吐量如何,关键是看底层存储如何做到极限,另一个重要因素就是网络io通信。接下来将从消息编解码,rpc通信介绍rocketMq的网络io~

rocketmq 通信模块代码结构

  如图所示,rocketMq网络通信是基于netty实现的,大家需要对java的nio和各种buffer和多线程的知识有一定的了解。


image.png

RemotingCommand中重要的几个变量

name 类型 说明
code int 请求操作码,服务端拿到相应的操作码做不同的处理
language byte 语言
version int 版本号
opaque int 客户端请求id,因为netty是异步编程,服务端响应中做拓传到客户端中,找到相应的请求
flag int rpc标示,是同步、异步、oneway
remark String 额外存储信息的
extFields Map 存放header头信息中的成员变量
serializeTypeCurrentRPC SerializeType 表示使用json还是rocketMq协议序列化和反序列化头信息

rocketMq编解码说明

  先说明一下rocketMq的消息体组成如图所示,rocketMq提供了两种 消息头 序列化方式,一种是json,一种是rocketMq自定义协议序列化方式,之后会说明一下自定义协议序列化方式。

消息体组成

  rocketMq反序列化是如何知道是那种序列化方式呢,答案就在headerLength中。
headerLength数据类型是int类型,第一个字节存储了消息头的序列化方式,后3个字节存储了消息头的长度。

消息头序列化:
public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];
        // SerializeType 是一个byte类型,0 表示json协议,1表示rocketMq协议
        result[0] = type.getCode();
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

消息头反序列化获取header类型:

public static SerializeType getProtocolType(int source) {
        // 因为第4个字节是header的类型,所以要右移24个比特位
        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
    }

再来看下有header长度和header类型如何获取header体的长度

public static int getHeaderLength(int length) {
      // 0xFFFFFF转换成byte数组是 [0,-1,-1,-1] ,int的后三个字节比特位全是1
      // byte 最高位1表示负数,而 11111111 8个比特位全是1 是 (byte)-1 的补码
        return length & 0xFFFFFF;
    }

rocketMq作为一个消息中间件,消息头有很多种,所以单独抽象出一个CommandCustomHeader接口,消息头的子类实现这个接口即可,接下来看一下接口定义

public interface CommandCustomHeader {
   // 子类的成员变量必须是java的基本类型,这个方法就是校验子类成员变量是否为空的
    void checkFields() throws RemotingCommandException;
}

接下来具体看下rocketMq是如何encode消息的

public ByteBuffer encode() {
        // 1> header length size
        int length = 4;
        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;
        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }
        ByteBuffer result = ByteBuffer.allocate(4 + length);
        // length
        result.putInt(length);
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        // header data
        result.put(headerData);
        // body data;
        if (this.body != null) {
            result.put(this.body);
        }
        result.flip();
        return result;
    }

①:length 代表消息的总长度,初始值为4表示了消息头的长度
②:将消息头转换为byte数组,分为json和rocketMq协议,会在下面具体分析
③:ByteBuffer.allocate(4 + length) 分配buffer堆内内存,前面length初始值为4,这里又加了4,因为有两个长度信息。
④:之后再把长度信息和header 、body push到内存中
⑤:之后在flip,转换为读模式,需要对ByteBuffer有一定的了解
说明:
java nio 中的ByteBuffer中有几个重要变量,limit、position、capacity。一开始new 一个buffer时,limit是等于capacity是处于写模式下,向buffer里put数据 position是逐渐递增的 不能超过limit的值。写了几个数据后我想从头开始读我写了哪些东西,这时做的一个操作是flip,其实就是将limit设置成了position的值,position 设置成0,get时position是递增的,直到小于limit。

// ByteBuffer的基本操作
ByteBuffer buffer = ByteBuffer.allocate(12);
        buffer.putInt(1);
        buffer.putInt(2);
        buffer.flip();
        System.out.println(buffer.getInt());
        System.out.println(buffer.getInt());
// flip方法也可以转换为如下方式
ByteBuffer buffer = ByteBuffer.allocate(12);
        buffer.putInt(1);
        buffer.putInt(2);
//        buffer.flip();
        buffer.limit(8);
        buffer.position(0);
        System.out.println(buffer.getInt());
        System.out.println(buffer.getInt());
rocketMq协议序列化头

前面说了rocketMq是有头信息的,头信息里面是java的基本变量,其实就是根据反射将变量信息先放在map中,然后在将map里面的k v 转换成byte 放到buffer中

// 存放header成员变量k v 值的map
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
// 这个方法就是通过反射先把customHeader中的成员变量放到map中
public void makeCustomHeaderToNet() {
        if (this.customHeader != null) {
            Field[] fields = getClazzFields(customHeader.getClass());
            if (null == this.extFields) {
                this.extFields = new HashMap<String, String>();
            }
            for (Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    String name = field.getName();
                    if (!name.startsWith("this")) {
                        Object value = null;
                        try {
                            field.setAccessible(true);
                            value = field.get(this.customHeader);
                        } catch (Exception e) {
                            log.error("Failed to access field [{}]", name, e);
                        }
                        if (value != null) {
                            this.extFields.put(name, value.toString());
                        }
                    }
                }
            }
        }
    }

下面说明一下rocketMq形式的encode如何将header转换成byte~,直接上代码

public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
        // String remark
        byte[] remarkBytes = null;
        int remarkLen = 0;
        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
            remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
            remarkLen = remarkBytes.length;
        }
        // HashMap<String, String> extFields
        byte[] extFieldsBytes = null;
        int extLen = 0;
        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
            extFieldsBytes = mapSerialize(cmd.getExtFields());
            extLen = extFieldsBytes.length;
        }
        int totalLen = calTotalLen(remarkLen, extLen);
        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
        // int code(~32767)
        headerBuffer.putShort((short) cmd.getCode());
        // LanguageCode language
        headerBuffer.put(cmd.getLanguage().getCode());
        // int version(~32767)
        headerBuffer.putShort((short) cmd.getVersion());
        // int opaque
        headerBuffer.putInt(cmd.getOpaque());
        // int flag
        headerBuffer.putInt(cmd.getFlag());
        // String remark
        if (remarkBytes != null) {
            headerBuffer.putInt(remarkBytes.length);
            headerBuffer.put(remarkBytes);
        } else {
            headerBuffer.putInt(0);
        }
        // HashMap<String, String> extFields;
        if (extFieldsBytes != null) {
            headerBuffer.putInt(extFieldsBytes.length);
            headerBuffer.put(extFieldsBytes);
        } else {
            headerBuffer.putInt(0);
        }
        return headerBuffer.array();
    }

①:将remark转换成字节,remark就是RemotingCommand的一个成员变量
②:下面重点来了,将customHeader通过序列化放到extFields转换成byte数组,接下来会说明
③:接下来计算header的长度,公式为,里面的变量意思详见上面表格

private static int calTotalLen(int remark, int ext) {
        // int code(~32767)
        int length = 2
            // LanguageCode language
            + 1
            // int version(~32767)
            + 2
            // int opaque
            + 4
            // int flag
            + 4
            // String remark
            + 4 + remark
            // HashMap<String, String> extFields
            + 4 + ext;
        return length;
    }

④:初始化一个buffer,然后一个一个将值put进去。
接下来看下rocketMq如何将HashMap序列化成byte数组

public static byte[] mapSerialize(HashMap<String, String> map) {
        // keySize+key+valSize+val
        if (null == map || map.isEmpty())
            return null;
        int totalLength = 0;
        int kvLength;
        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> entry = it.next();
            if (entry.getKey() != null && entry.getValue() != null) {
                kvLength =
                    // keySize + Key
                    2 + entry.getKey().getBytes(CHARSET_UTF8).length
                        // valSize + val
                        + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
                totalLength += kvLength;
            }
        }
        ByteBuffer content = ByteBuffer.allocate(totalLength);
        byte[] key;
        byte[] val;
        it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> entry = it.next();
            if (entry.getKey() != null && entry.getValue() != null) {
                key = entry.getKey().getBytes(CHARSET_UTF8);
                val = entry.getValue().getBytes(CHARSET_UTF8);
                content.putShort((short) key.length);
                content.put(key);
                content.putInt(val.length);
                content.put(val);
            }
        }
        return content.array();
    }

说明:其实就是拿到map的迭代器,把key和value按顺序放入到byte数组中,key的长度为short类型,value的长度为int类型,先计算出总长度,然后一个一个的放到buffer里面。

json协议序列化header信息

  json协议序列化就十分简单了,直接把序列化后的json字符串获取byte返回。

// body 和 customHeader 被 transient修饰,json序列化和反序列话不会包括这俩字段
private transient byte[] body;
private transient CommandCustomHeader customHeader;

RemotingSerializable.encode(RemotingCommand.this);
// encode 调用这个方法
public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

rocketMq反序列化

接下来看一下反序列化如何实现的

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        // 整个消息体的长度,netty的LengthFieldBasedFrameDecoder处理粘包拆包时,已经把消息的总长度4个字节跳过
        int length = byteBuffer.limit();
        // 带上协议的header长度
        int oriHeaderLen = byteBuffer.getInt();
       // 去掉协议的header长度
        int headerLength = getHeaderLength(oriHeaderLen);
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;
        return cmd;
    }

 // 下面是headerDecode 的相关代码,只是说明rocketMq header协议的代码
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
        RemotingCommand cmd = new RemotingCommand();
        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
        // int code(~32767)
        cmd.setCode(headerBuffer.getShort());
        // LanguageCode language
        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
        // int version(~32767)
        cmd.setVersion(headerBuffer.getShort());
        // int opaque
        cmd.setOpaque(headerBuffer.getInt());
        // int flag
        cmd.setFlag(headerBuffer.getInt());
        // String remark
        int remarkLength = headerBuffer.getInt();
        if (remarkLength > 0) {
            byte[] remarkContent = new byte[remarkLength];
            headerBuffer.get(remarkContent);
            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
        }
        // HashMap<String, String> extFields
        int extFieldsLength = headerBuffer.getInt();
        if (extFieldsLength > 0) {
            byte[] extFieldsBytes = new byte[extFieldsLength];
            headerBuffer.get(extFieldsBytes);
            // 重点看下如何将byte反序列化成map
            cmd.setExtFields(mapDeserialize(extFieldsBytes));
        }
        return cmd;
    }

public static HashMap<String, String> mapDeserialize(byte[] bytes) {
        if (bytes == null || bytes.length <= 0)
            return null;
        HashMap<String, String> map = new HashMap<String, String>();
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        short keySize;
        byte[] keyContent;
        int valSize;
        byte[] valContent;
        while (byteBuffer.hasRemaining()) {
            keySize = byteBuffer.getShort();
            keyContent = new byte[keySize];
            byteBuffer.get(keyContent);
            valSize = byteBuffer.getInt();
            valContent = new byte[valSize];
            byteBuffer.get(valContent);
            map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
        }
        return map;
    }

说明:rocketmq使用netty自带的LengthFieldBasedFrameDecoder来解码,其底层帮我们处理好了粘包和拆包的问题,而在NettyDecoder的构造方法中把 消息总长度的length去掉了 如代码所示

public NettyDecoder() {
       // 第一个0 表示 lengthField 的offset,第一个4表示lengthField的length,最后面的4 表示跳过4个字节
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

将extFields转换成map其实就是encode的一个逆过程,当buffer里面有数据时,先获取k v的长度,然后在声明一个长度的byte数组把对应的value取出来,之后在put到map中。

总结

  rocketMq的rpc通信使用netty,需要了解下netty底层的线程模型java nio 和 netty ByteBuf的相关知识,编解码其实就是定义好一个通信协议,根据协议将对象转换成byte,byte转换成对象的过程,在接下来的一篇文章里,介绍一下如何使用netty做 sync 、async、oneway形式的rpc通信的。以上就是通过看rocketMq rpc模块编解码相关代码总结出来的,如有错误,欢迎指出讨论~

相关文章

网友评论

    本文标题:rocketmq rpc 通信(一) 编解码

    本文链接:https://www.haomeiwen.com/subject/uxoddctx.html