美文网首页Java 进阶JavaJava 核心技术
netty(十六)Netty提升 - 自定义编解码器

netty(十六)Netty提升 - 自定义编解码器

作者: 我犟不过你 | 来源:发表于2021-11-16 16:46 被阅读0次

    我们在使用网络编程时,可以根据自己的业务场景,设计自己的协议。比如我们与外部接口对接,会使用一定特定的加密算法,使用特定的标签,以及固定格式的报文等等,都可以算作我们确定身份的协议。

    下面我们设计一个自己的协议,来实现一些功能。

    一、协议要素

    创建一个好的协议,通常来说,必然要有以下要素:

    • 魔数:用来在第一时间判定是否是无效数据包
      • 例如:Java Class文件都是以0x CAFEBABE开头的。Java这么做的原因就是为了快速判断一个文件是不是有可能为class文件,以及这个class文件有没有受损。
    • 版本号:可以支持协议的升级
    • 序列化算法:消息正文到底采用哪种序列化反序列化方式
    • 指令类型:针对业务类型指定
    • 请求序号:为了双工通信,提供异步能力,序号用于回调
    • 正文长度:没有长度会导致浏览器持续加载
    • 消息正文:具体消息内容

    二、自定义编解码器

    我们下面简单的写一个自定义编解码器,主要是为了体验过程,深刻印象。

    在写这个之前,先歇一歇准备工作;我们需要准备一个父类Message:

    import lombok.Data;
    
    import java.io.Serializable;
    
    @Data
    public abstract class Message implements Serializable {
    
        public final static int LOGIN_REQUEST_MESSAGE = 0;
    
        private int messageType;
    
        private int sequenceId;
    
        abstract int getMessageType();
    
    }
    

    一个子类LoginMessage:

    @Data
    public class LoginMessage extends Message {
    
        private String username;
    
        private String password;
    
        private Date loginTime;
    
        public LoginMessage(String username, String password, Date loginTime) {
            this.username = username;
            this.password = password;
            this.loginTime = loginTime;
        }
    
        @Override
        int getMessageType() {
            return LOGIN_REQUEST_MESSAGE;
        }
    }
    

    有了以上的基础,我们就能编写编解码器了,按照前面提到的几个要素,按照顺序逐一编写:

    public class MessageCodec extends ByteToMessageCodec<Message> {
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message msg, ByteBuf out) throws Exception {
            // 4 字节的魔数
            out.writeBytes(new byte[]{1, 2, 3, 4});
            // 1 字节的版本,
            out.writeByte(1);
            // 1 字节的序列化方式 0:jdk
            out.writeByte(0);
            // 1 字节的指令类型
            out.writeByte(msg.getMessageType());
            // 4 个字节的请求序号
            out.writeInt(msg.getSequenceId());
            // 无意义,对齐填充,使其满足2的n次方
            out.writeByte(0xff);
            // 获取内容的字节数组
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(msg);
            byte[] bytes = bos.toByteArray();
            // 长度
            out.writeInt(bytes.length);
            // 写入内容
            out.writeBytes(bytes);
        }
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
            int magicNum = in.readInt();
            byte version = in.readByte();
            byte serializerType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            in.readByte();
            int length = in.readInt();
            byte[] bytes = new byte[length];
            in.readBytes(bytes, 0, length);
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) ois.readObject();
            System.out.println("decode:" + magicNum + "," + version + "," + serializerType + "," + messageType + "," + sequenceId + "," + length);
            System.out.println("decode:" + message);
            out.add(message);
        }
    }
    

    下面写个客户端,用于发送消息:

    public class Client {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        //引入我们的编解码器
                        ch.pipeline().addLast(new MessageCodec());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                //组装消息,并发送
                                Message message = new LoginMessage("Tom","123456",new Date());
                                ctx.writeAndFlush(message);
                                super.channelActive(ctx);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                worker.shutdownGracefully();
            }
        }
    }
    

    下面是服务端,用于接收消息和解码:

    public class Server {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        //引入编解码器
                        ch.pipeline().addLast(new MessageCodec());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印消息内容
                                System.out.println(msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    

    结果:

    decode:16909060,1,0,0,0,326
    decode:LoginMessage(username=Tom, password=123456, loginTime=Tue Nov 16 15:47:06 CST 2021)
    LoginMessage(username=Tom, password=123456, loginTime=Tue Nov 16 15:47:06 CST 2021)
    

    上面前两条是我们在编解码器中打印的日志,最后一条是我们的业务代码打印。

    16909060表示我们的魔数,这里是10进制表示的,转化成二进制其实是[01 02 03 04],后面的数字都是我们根据要素指定的。

    而326使我们发送消息的字节长度。

    存在的问题?
    其实在我们的代码当中存在问题,我们对整个请求都设置了自己的协议,会有以下两种情况:
    1)粘包,这种情况下,我们可以根据自己的规则获取到正确的消息,因为消息内容的长度等等都有指定。
    2)半包,此时将导致我们的消息获取不完全,解码失败。

    所以我们仍然要使用前一章节当中的解码器:

     ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,12,4,0,0));
    

    参数解释分别是最大长度1024,消息长度偏移量12,消息长度4,消息长度距离消息0个长度,需要从头部剥离0个长度。

    相关文章

      网友评论

        本文标题:netty(十六)Netty提升 - 自定义编解码器

        本文链接:https://www.haomeiwen.com/subject/ogshtrtx.html