美文网首页
基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

作者: 跟着Mic学架构 | 来源:发表于2021-11-15 11:06 被阅读0次

    所谓的协议,是由语法、语义、时序这三个要素组成的一种规范,通信双方按照该协议规范来实现网络数据传输,这样通信双方才能实现数据正常通信和解析。

    由于不同的中间件在功能方面有一定差异,所以其实应该是没有一种标准化协议来满足不同差异化需求,因此很多中间件都会定义自己的通信协议,另外通信协议可以解决粘包和拆包问题。

    在本篇文章中,我们来实现一个自定义消息协议。

    自定义协议的要素

    自定义协议,那这个协议必须要有组成的元素,

    • 魔数: 用来判断数据包的有效性
    • 版本号: 可以支持协议升级
    • 序列化算法: 消息正文采用什么样的序列化和反序列化方式,比如json、protobuf、hessian等
    • 指令类型:也就是当前发送的是一个什么类型的消息,像zookeeper中,它传递了一个Type
    • 请求序号: 基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理
    • 消息长度
    • 消息正文

    协议定义

    sessionId | reqType | Content-Length | Content |
    

    其中Version,Content-Length,SessionId就是Header信息,Content就是交互的主体。

    定义项目结构以及引入包

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    

    项目结构如图4-1所示:

    • netty-message-mic : 表示协议模块。
    • netty-message-server :表示nettyserver。

    <center>图4-1</center>

    • 引入log4j.properties

    在nettyMessage-mic中,包的结构如下。

    image-20210831103346370

    定义Header

    表示消息头

    @Data
    public class Header{
        private long sessionId; //会话id  : 占8个字节
        private byte type; //消息类型: 占1个字节
    
        private int length;     //消息长度 : 占4个字节
    }
    

    定义MessageRecord

    表示消息体

    @Data
    public class MessageRecord{
    
        private Header header;
        private Object body;
    }
    

    OpCode

    定义操作类型

    public enum OpCode {
    
        BUSI_REQ((byte)0),
        BUSI_RESP((byte)1),
        PING((byte)3),
        PONG((byte)4);
    
        private byte code;
    
        private OpCode(byte code) {
            this.code=code;
        }
    
        public byte code(){
            return this.code;
        }
    }
    

    定义编解码器

    分别定义对该消息协议的编解码器

    MessageRecordEncoder

    @Slf4j
    public class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> {
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception {
            log.info("===========开始编码Header部分===========");
            Header header=record.getHeader();
            byteBuf.writeLong(header.getSessionId()); //保存8个字节的sessionId
            byteBuf.writeByte(header.getType());  //写入1个字节的请求类型
    
            log.info("===========开始编码Body部分===========");
            Object body=record.getBody();
            if(body!=null){
                ByteArrayOutputStream bos=new ByteArrayOutputStream();
                ObjectOutputStream oos=new ObjectOutputStream(bos);
                oos.writeObject(body);
                byte[] bytes=bos.toByteArray();
                byteBuf.writeInt(bytes.length); //写入消息体长度:占4个字节
                byteBuf.writeBytes(bytes); //写入消息体内容
            }else{
                byteBuf.writeInt(0); //写入消息长度占4个字节,长度为0
            }
        }
    }
    

    MessageRecordDecode

    @Slf4j
    public class MessageRecordDecode extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            MessageRecord record=new MessageRecord();
            Header header=new Header();
            header.setSessionId(byteBuf.readLong());  //读取8个字节的sessionid
            header.setType(byteBuf.readByte()); //读取一个字节的操作类型
            record.setHeader(header);
            //如果byteBuf剩下的长度还有大于4个字节,说明body不为空
            if(byteBuf.readableBytes()>4){
                int length=byteBuf.readInt(); //读取四个字节的长度
                header.setLength(length);
                byte[] contents=new byte[length];
                byteBuf.readBytes(contents,0,length);
                ByteArrayInputStream bis=new ByteArrayInputStream(contents);
                ObjectInputStream ois=new ObjectInputStream(bis);
                record.setBody(ois.readObject());
                list.add(record);
                log.info("序列化出来的结果:"+record);
            }else{
                log.error("消息内容为空");
            }
        }
    }
    

    测试协议的解析和编码

    EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的

    public class CodesMainTest {
        public static void main( String[] args ) throws Exception {
            EmbeddedChannel channel=new EmbeddedChannel(
                new LoggingHandler(),
                new MessageRecordEncoder(),
                new MessageRecordDecode());
            Header header=new Header();
            header.setSessionId(123456);
            header.setType(OpCode.PING.code());
            MessageRecord record=new MessageRecord();
            record.setBody("Hello World");
            record.setHeader(header);
            channel.writeOutbound(record);
    
            ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
            new MessageRecordEncoder().encode(null,record,buf);
            channel.writeInbound(buf);
        }
    }
    

    编码包分析

    运行上述代码后,会得到下面的一个信息

             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |.......@........|
    |00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64    |.t..Hello World |
    +--------+-------------------------------------------------+----------------+
    

    按照协议规范:

    • 前面8个字节表示sessionId
    • 一个字节表示请求类型
    • 4个字节表示长度
    • 后面部分内容表示消息体

    测试粘包和半包问题

    通过slice方法进行拆分,得到两个包。

    ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。

    public class CodesMainTest {
        public static void main( String[] args ) throws Exception {
            //EmbeddedChannel是netty专门针对ChannelHandler的单元测试而提供的类。可以通过这个类来测试channel输入入站和出站的实现
            EmbeddedChannel channel=new EmbeddedChannel(
                    //解决粘包和半包问题
    //                new LengthFieldBasedFrameDecoder(2048,10,4,0,0),
                    new LoggingHandler(),
                    new MessageRecordEncoder(),
                    new MessageRecordDecode());
            Header header=new Header();
            header.setSessionId(123456);
            header.setType(OpCode.PING.code());
            MessageRecord record=new MessageRecord();
            record.setBody("Hello World");
            record.setHeader(header);
            channel.writeOutbound(record);
    
            ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
            new MessageRecordEncoder().encode(null,record,buf);
    
           //*********模拟半包和粘包问题************//
            //把一个包通过slice拆分成两个部分
            ByteBuf bb1=buf.slice(0,7); //获取前面7个字节
            ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); //获取后面的字节
            bb1.retain();
    
            channel.writeInbound(bb1);
            channel.writeInbound(bb2);
        }
    }
    

    运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。

    2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 00 00 00 00 00 01 e2                            |.......         |
    +--------+-------------------------------------------------+----------------+
    2021-08-31 15:53:01,397 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
    Exception in thread "main" io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))
    

    解决拆包问题

    LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。

    首先来说明一下该解码器的核心参数

    • lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置
    • lengthFieldLength,长度字段锁占用的字节数
    • lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值
    • initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
    • lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
    public class CodesMainTest {
        public static void main( String[] args ) throws Exception {
            EmbeddedChannel channel=new EmbeddedChannel(
                    //解决粘包和半包问题
                    new LengthFieldBasedFrameDecoder(1024,
                            9,4,0,0),
                    new LoggingHandler(),
                    new MessageRecordEncoder(),
                    new MessageRecordDecode());
            Header header=new Header();
            header.setSessionId(123456);
            header.setType(OpCode.PING.code());
            MessageRecord record=new MessageRecord();
            record.setBody("Hello World");
            record.setHeader(header);
            channel.writeOutbound(record);
    
            ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
            new MessageRecordEncoder().encode(null,record,buf);
    
           //*********模拟半包和粘包问题************//
            //把一个包通过slice拆分成两个部分
            ByteBuf bb1=buf.slice(0,7);
            ByteBuf bb2=buf.slice(7,buf.readableBytes()-7);
            bb1.retain();
    
            channel.writeInbound(bb1);
            channel.writeInbound(bb2);
        }
    }
    

    添加一个长度解码器,就解决了拆包带来的问题。运行结果如下

    2021-08-31 16:09:35,115 [com.netty.example.codec.MessageRecordDecode]-[INFO] 序列化出来的结果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World)
    2021-08-31 16:09:35,116 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
    

    基于自定义消息协议通信

    下面我们把整个通信过程编写完整,代码结构如图4-2所示.

    image-20210831175056500

    <center>图4-2</center>

    服务端开发

    @Slf4j
    public class ProtocolServer {
    
        public static void main(String[] args){
            EventLoopGroup boss = new NioEventLoopGroup();
            //2 用于对接受客户端连接读写操作的线程工作组
            EventLoopGroup work = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, work) //绑定两个工作线程组
                .channel(NioServerSocketChannel.class)  //设置NIO的模式
                // 初始化绑定服务通道
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline()
                            .addLast(
                            new LengthFieldBasedFrameDecoder(1024,
                                                             9,4,0,0))
                            .addLast(new MessageRecordEncoder())
                            .addLast(new MessageRecordDecode())
                            .addLast(new ServerHandler());
                    }
                });
            ChannelFuture cf= null;
            try {
                cf = b.bind(8080).sync();
                log.info("ProtocolServer start success");
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                work.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
    }
    

    ServerHandler

    @Slf4j
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MessageRecord messageRecord=(MessageRecord)msg;
            log.info("server receive message:"+messageRecord);
            MessageRecord res=new MessageRecord();
            Header header=new Header();
            header.setSessionId(messageRecord.getHeader().getSessionId());
            header.setType(OpCode.BUSI_RESP.code());
            String message="Server Response Message!";
            res.setBody(message);
            header.setLength(message.length());
            ctx.writeAndFlush(res);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("服务器读取数据异常");
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
    }
    

    客户端开发

    public class ProtocolClient {
    
        public static void main(String[] args) {
            //创建工作线程组
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,
                                                                               9,4,0,0))
                            .addLast(new MessageRecordEncoder())
                            .addLast(new MessageRecordDecode())
                            .addLast(new ClientHandler());
    
                    }
                });
            // 发起异步连接操作
            try {
                ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8080)).sync();
                Channel c = future.channel();
                for (int i = 0; i < 500; i++) {
                    MessageRecord message = new MessageRecord();
                    Header header = new Header();
                    header.setSessionId(10001);
                    header.setType((byte) OpCode.BUSI_REQ.code());
                    message.setHeader(header);
                    String context="我是请求数据"+i;
                    header.setLength(context.length());
                    message.setBody(context);
                    c.writeAndFlush(message);
                }
                //closeFuture().sync()就是让当前线程(即主线程)同步等待Netty server的close事件,Netty server的channel close后,主线程才会继续往下执行。closeFuture()在channel close的时候会通知当前线程。
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
        }
    }
    

    ClientHandler

    @Slf4j
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MessageRecord record=(MessageRecord)msg;
            log.info("Client Receive message:"+record);
            super.channelRead(ctx, msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
    }
    

    相关文章

      网友评论

          本文标题:基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

          本文链接:https://www.haomeiwen.com/subject/cgtqtrtx.html