美文网首页
Netty+ProtoBuf传输多种类型对象的两种方式

Netty+ProtoBuf传输多种类型对象的两种方式

作者: Wannay | 来源:发表于2021-10-14 15:14 被阅读0次

    1.传统方式发送对象数据的方式

    加上这个解码器进行解码

    pipeline.addLast(new ProtobufDecoder(UserPOJO.User.getDefaultInstance()));
    

    加上下面这个编码器进行编码

     pipeline.addLast(new ProtobufEncoder());
    

    但是这样的方式有一个很大的弊端,那就是他并不能解析多种类型的ProtoBuf传输的对象,因为UserPOJO.User.getDefaultInstance()中已经写死了解码器可以解析的对象类型,那么我们该如何去做呢?

    可以自定义协议+自定义ProtoBuf编解码器去进行解决,也可以使用oneof解决。

    2.使用自定义ProtoBuf编码器的方式去实现传输多种类型的对象

    我们定义如下的message,编写这样的一个msg.proto文件

    //proto版本,有proto2和proto3两个版本,支持的语言不同,支持的语法也不同,两个版本之间存在了很多的不同之处
    syntax = "proto3";
    
    package com.wanna.webflux.nio.netty.buffer.protobuf;  // 指定包名,主要用来保证命名空间冲突
    
    // 指定生成的外部类名(生成的.java文件的文件名)
    option java_outer_classname = "UserPOJO";
    
    // 如果没有指定java_package,那么默认会采用package作为java的包名
    // 就算指定了java_package也得指定package,因为其它语言可能没有包的概念,就有可能导致命名空间冲突
    option java_package = "com.wanna.webflux.nio.netty.buffer.protobuf";
    
    // protobuf 使用message管理数据
    // 使用`protoc --java_out=/Users/wanna/Desktop/Code/java/WebFlux/src/main/java msg.proto`进行构建出来一个.java文件
    // --java_out文件指定的是包的base路径,package是生成的包
    message User {
    
      // protobuf本身是不支持继承的,所以别尝试去寻找一个继承的做法
    
      // option代表该字段是可选的
      // 一般可用的修饰符有required/optional/repeated这三个,在proto3中已经不再支持使用required修饰符
      // required是必须指定的字段,不指定会抛异常,不推荐使用,从required转到option是存在问题的
      // optional代表该字段是一个可选的字段,不是必须指定的,推荐使用
      // repeated代表该字段可以重复出现(0次以上),相当于一个动态数组(List)
      optional int32 id = 1;  // 1代表字段的序号
      optional string name = 2;  // 2代表字段的序号
    
      // 定义一个枚举类型
      enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    
      // 定义一个message,也就是对应Java的一个类
      message PhoneNumber {
        optional string number = 1;
    
        // 在proto2中支持[default = HOME]这样的方式去指定默认值
        // 在proto3中已经不再支持该语法了
        optional PhoneType type = 2;
      }
    
      // List,每个元素是一个PhoneNumber类型的
      repeated PhoneNumber phones = 4;
    }
    
    message Student {
      optional int32 id = 1;
      optional string name = 2;
    }
    
    // 要构建一个Message,必须使用Message的构建器去进行set,最终调用build方法去完成消息的构建
    // 一旦构建好了一个Message对象,那么该对象就是不可变的,不能再进行修改
    

    我们先封装一个协议

    msg长度的低8bit | msg长度的高8bit | unused | 对象类型 | ...msg...
    

    我们自己封装一个header,其中高两字节代表长度,最低字节代表对象的类型。

    我们先来分析一下,2个byte去定义长度,因此可以表示对象的最大长度为65535B,也就是64KB;1byte定义类型,可以有255种类型,暂时勉强够用。通过长度还能判断一下后面传输的对象的长度,还能解决TCP的粘包问题,足够使用。

    我们先定义一个常量类

    public class MessageProtocolConstants {
    
        public static int BYTES_OF_HEADER = 4;  //header的长度
    
        public static int INDEX_OF_HEADER_LOW_8_BITS = 0;  //在header中低8位所在的索引
    
        public static int INDEX_OF_HEADER_HIGH_8_BITS = 1;  //在header中高8位所在的索引
    
        public static int INDEX_OF_UNUSED = 2;   //在header中尚未使用的bit
    
        public static int INDEX_OF_HEADER_TYPE = 3;  //在header中type所在的索引
    
        public static int MASK_OF_LOW_8_BITS = 0x00ff;  //从header中提取出来长度的低8Bit
    
        public static int MASK_OF_HIGH_8_BITS = 0xff00;  //从header中提取出来长度的高8Bit
    
        public static byte HEADER_OF_TYPE_USER = 0x01;  //User类型的Type枚举值
    
        public static byte HEADER_OF_TYPE_STUDENT = 0x02;  //Student类型的Type枚举值
    }
    

    下面是自定义的编码器

    public class CustomProtoBufEncoder extends MessageToByteEncoder<MessageLite> {
        @Override
        protected void encode(ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {
            byte[] body = msg.toByteArray();  //将消息转成字节数组
            byte[] header = generateHeader(msg, (short) body.length);  //根据Type生成header
    
            out.writeBytes(header);  //写header
            out.writeBytes(body);  //写body
        }
    
        //short占用2Bytes,有16个bit,可以代表的最大的数是65535,因此可以代表最大的长度为64KB,应该足够一个对象的传输了
        private byte[] generateHeader(MessageLite msg, short length) throws Exception {
            byte typeOfObject;  //要进行传输的对象的类型
            if (msg instanceof UserPOJO.User) {
                typeOfObject = MessageProtocolConstants.HEADER_OF_TYPE_USER;
            } else if (msg instanceof UserPOJO.Student) {
                typeOfObject = MessageProtocolConstants.HEADER_OF_TYPE_STUDENT;
            } else {
                throw new NotSupportedException("不支持" + msg.getClass() + "类型该对象的传输");
            }
            byte[] header = new byte[MessageProtocolConstants.BYTES_OF_HEADER];
            //byte[0..1]表示消息的长度,byte[2]暂时保留不进行使用,byte[3]存放类型
            header[MessageProtocolConstants.INDEX_OF_HEADER_LOW_8_BITS] = (byte) (length & MessageProtocolConstants.MASK_OF_LOW_8_BITS);  //低8位长度
            header[MessageProtocolConstants.INDEX_OF_HEADER_HIGH_8_BITS] = (byte) (length & MessageProtocolConstants.MASK_OF_HIGH_8_BITS);  //高8位长度
            header[MessageProtocolConstants.INDEX_OF_HEADER_TYPE] = typeOfObject;  //对象类型
            return header;
        }
    }
    

    下面是自定义的解码器

    public class CustomProtoBufDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            //如果可读的字节数量还不够头部的长度,那么直接return...
            if (in.readableBytes() <= MessageProtocolConstants.BYTES_OF_HEADER) {
                return;
            }
            byte[] header = new byte[MessageProtocolConstants.BYTES_OF_HEADER];
            in.readBytes(header); //读取header
            int length = (header[MessageProtocolConstants.INDEX_OF_HEADER_HIGH_8_BITS] << 8)
                    + header[MessageProtocolConstants.INDEX_OF_HEADER_LOW_8_BITS];  //高8bit左移8位+低8bit得到长度
            byte type = header[MessageProtocolConstants.INDEX_OF_HEADER_TYPE];  //解析传递的出来type类型
            byte[] body = new byte[length]; //创建指定长度的byte[]用来去进行读取body
            in.readBytes(body);  //读取body...
            MessageLite object = getObject(body, type);  //解析对象
            out.add(object);  //加入到结果列表当中
        }
    
        private MessageLite getObject(byte[] body, byte type) throws Exception {
            if (type == MessageProtocolConstants.HEADER_OF_TYPE_USER) {
                return UserPOJO.User.getDefaultInstance().getParserForType().parseFrom(body);
            } else if (type == MessageProtocolConstants.HEADER_OF_TYPE_STUDENT) {
                return UserPOJO.Student.getDefaultInstance().getParserForType().parseFrom(body);
            }
            throw new UnsupportedOperationException("不支持解析该类型的对象");
        }
    }
    

    3. 使用ProtoBuf提供的oneof实现

    编写如下的.proto文件

    syntax = "proto3";
    
    package com.wanna.webflux.nio.netty.buffer.protobuf;
    option java_outer_classname = "UserPOJO2";
    option java_package = "com.wanna.webflux.nio.netty.buffer.protobuf";
    
    
    // 把要传递的对象全部都包装到ObjectMessage对象当中
    message ObjectMessage {
      enum DataType {
        UserType = 0;
        StudentType = 1;
      }
    
      // 数据类型,是个枚举值
      optional DataType type = 1;
    
      // oneof,只能使用和传递其中一个对象
      oneof body {
        User user = 2;
        Student student = 3;
      }
    
    }
    
    message User {
      optional int32 id = 1;
      optional string name = 2;
    
      enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    
      message PhoneNumber {
        optional string number = 1;
        optional PhoneType type = 2;
      }
    
      repeated PhoneNumber phones = 4;
    }
    
    message Student {
      optional int32 id = 1;
      optional string name = 2;
    }
    

    在传输对象时定义一个枚举类型的Type,再使用oneof去指定只能传递一个类型的对象,这样就在读取数据的时候再根据枚举类型进行处理就可以了。

    在传输时使用如下的方式进行传输,再使用传统的方式加入ProtoBufEncoder和ProtoBufDecoder就可以进行传输。

                UserPOJO2.ObjectMessage message = UserPOJO2.ObjectMessage.newBuilder().setUser(
                        UserPOJO2.User.newBuilder().setId(1).setName("wanna").build()
                ).setType(UserPOJO2.ObjectMessage.DataType.UserType).build();
    
                ctx.writeAndFlush(message);
    

    在接收端根据枚举去判断对应类型的对象,去进行处理。

                if (msg instanceof UserPOJO2.ObjectMessage) {
                    UserPOJO2.ObjectMessage message = (UserPOJO2.ObjectMessage) msg;
                    UserPOJO2.ObjectMessage.DataType type = message.getType();
                    if (type == UserPOJO2.ObjectMessage.DataType.StudentType) {
                        //....
                    } else if (type == UserPOJO2.ObjectMessage.DataType.UserType) {
                        //....
                    } else {
                        
                    }
                }
    

    相关文章

      网友评论

          本文标题:Netty+ProtoBuf传输多种类型对象的两种方式

          本文链接:https://www.haomeiwen.com/subject/plcuoltx.html