美文网首页IT@程序员猿媛SpringBoot精选简友广场
Netty中的编解码,至少知道这两种

Netty中的编解码,至少知道这两种

作者: 程就人生 | 来源:发表于2019-12-03 23:20 被阅读0次
    天气虽冷,但晚霞挺美!

    本文涉及的知识点包括:
    1.Netty中使用的主流编解码框架有哪些;
    2.如何不写服务端、客户端就可以对编解码或hanlder进行测试;
    3.如何把编解码两个类合成一个类,以及更简便的方法;

    使用Netty框架有一段时间了,存在很多疑惑。其中之一的疑惑,可以使用的编解码协议那么多,看得眼花缭乱,到底该使用哪个呢?就是因为多,所以不知道该选择哪个,特别是对于新手而言,这太难了!
    Netty中粘包拆包,小试牛刀
    Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题
    Netty整合Protobuf编解码,并解决半包问题
    Netty整合JBoss Marshalling编解码

    上面是整理的编解码案例,使用的主流编解码框架如下,案例中没有最后一个,Facebook的Thrift也是当今主流编解码框架之一,在这里也列出来吧,有时间可以研究研究。

    Google的Protobuf
    JBoss的Marshalling
    MessagePack
    Facebook的Thrift

    除了上面的这几种编解码,还有下面这些Netty内置的、开箱即用的处理粘包/半包解码类,可以配合上面的编解码使用;

    FixedLengthFrameDecoder
    LineBasedFrameDecoder
    DelimiterBasedFrameDecoder
    LengthFieldBasedFrameDecoder

    通过这些类的整理,并查看了相关源代码,可以发现,虽然可以使用的编解码协议很多,整理一下也不外乎两种,一种是对象之间互转,另一种是对象和二进制之间互转。

    在Netty中就有这么两对类,ByteToMessageDecoder、MessageToByteEncoder、
    MessageToMessageDecoder、MessageToMessageEncoder。

    更有意思的是:
    FixedLengthFrameDecoder extends ByteToMessageDecoder
    LineBasedFrameDecoder extends ByteToMessageDecoder
    DelimiterBasedFrameDecoder extends ByteToMessageDecoder
    LengthFieldBasedFrameDecoder extends ByteToMessageDecoder

    没错,这四种处理粘包/半包的类都继承了ByteToMessageDecoder,只是各自的实现方式不同而已。也就是说,我们可以根据业务需求继承ByteToMessageDecoder个性化编解码类。另外,其他一些编解码的类,也和这两对类脱不了干系。

    MarshallingDecoder extends LengthFieldBasedFrameDecoder
    ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder>

    有时候,既写编码类又写解码类,还要在使用时写两次,很麻烦,把编解码写在一个类里,又会影响调用时候的性能;Netty提供了一个合成类供大家使用,它就是CombinedChannelDuplexHandler,把编解码写在一起,但是又不影响使用的性能,真是一举两得。

    基于Netty框架写个demo不容易,既写服务端还要写客户端,这个棘手的问题Netty框架也替我们考虑了,有一个内置的类可以用,它便是EmbeddedChannel,有了这两个强大的类,接下来可以很方便的写demo了。

    下面就写一个二进制转化为String的编解码测试用例,第一个类是编解码合成类;

    import java.util.List;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.CombinedChannelDuplexHandler;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 二进制和String的相互转换
     * @author 程就人生
     * @date 2019年12月3日
     */
    public class StringBinaryCodec extends CombinedChannelDuplexHandler<ByteToStringDecoder, StringToByteEncoder>{
    
        //将编解码类合在一起
        public StringBinaryCodec(){
            super(new ByteToStringDecoder(), new StringToByteEncoder());
        }
    }
    
    /**
     * 把二进制解码成String字符串
     * @author 程就人生
     * @date 2019年12月3日
     */
    class ByteToStringDecoder extends ByteToMessageDecoder{
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 标记一下当前的readIndex的位置
            in.markReaderIndex();
            // 判断包头长度
            if (in.readableBytes() < 4) {// 不够包头
                return;
            }           
            // 读取传送过来的消息的长度。
            int length = in.readInt();
            // 长度如果小于0
            if (length < 0) {// 非法数据,关闭连接
                ctx.close();
            }
            // 读到的消息体长度如果小于传送过来的消息长度
            if (length > in.readableBytes()) {
                // 重置读取位置
                in.resetReaderIndex();
                return;
            }
            // 直接缓冲,读取字符串
            byte[] array = new byte[length];
            in.readBytes(array, 0, length);
            
            String str = new String(array,"utf-8");
            out.add(str);
            System.out.println("入站数据:"+ str);
        }
        
    }
    
    /**
     * 把字符串编码成二进制
     * @author 程就人生
     * @date 2019年12月3日
     */
    class StringToByteEncoder extends MessageToByteEncoder<String> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            
            System.out.println("出站数据:"+msg);
            
            // 将对象转换为byte
            byte[] bytes = msg.getBytes();
    
            // 读取消息的长度
            int length = bytes.length;
    
            // 先将消息长度写入,也就是消息头
            out.writeInt(length);
            
            // 消息体中包含我们要发送的数据
            out.writeBytes(msg.getBytes());         
        }       
    }
    

    第二个是测试用例:

    import java.nio.charset.Charset;
    
    import org.junit.Test;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.embedded.EmbeddedChannel;
    
    /**
     * 测试用例
     * @author 程就人生
     * @date 2019年12月3日
     */
    public class TestCode {
    
        @SuppressWarnings("rawtypes")
        @Test
        public void test(){
            //通道初始化
            ChannelInitializer channelInitializer =  new ChannelInitializer<EmbeddedChannel>(){
                @Override
                protected void initChannel(EmbeddedChannel ch) throws Exception {
                    //编解码合二为一,用一个代替两个
                    ch.pipeline().addLast(new StringBinaryCodec());
    //              ch.pipeline().addLast(new ByteToStringDecoder());
    //              ch.pipeline().addLast(new StringToByteEncoder());
                }           
            };      
            
            //内置的测试类
            EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
            //入站模拟
            String str = "这是入站数据啦";         
            for(int i=0;i<100;i++){
                byte[] bytes = (str+i).getBytes(Charset.forName("utf-8"));
                ByteBuf buf = Unpooled.buffer(bytes.length + 4);
                buf.writeInt(bytes.length);
                buf.writeBytes(bytes);
                channel.writeOneInbound(buf);           
            }
            
            //出站模拟测试        
            str = "你好呀,这是出站数据啦";
            for(int j=0;j<100;j++){
                channel.writeAndFlush(str+j);
            }           
        }
    }
    

    测试结果:

    入站数据:这是入站数据啦0
    入站数据:这是入站数据啦1
    入站数据:这是入站数据啦2
    入站数据:这是入站数据啦3
    入站数据:这是入站数据啦4
    
    出站数据:你好呀,这是出站数据啦0
    出站数据:你好呀,这是出站数据啦1
    出站数据:你好呀,这是出站数据啦2
    出站数据:你好呀,这是出站数据啦3
    出站数据:你好呀,这是出站数据啦4
    

    肯定有小伙伴说,Netty不是提供了ByteToMessageCodec合成类吗,你怎么还自个儿发明轮子?没错,直接继承ByteToMessageCodec,写起来代码更简洁,下面就继承一下吧;

    import java.util.List;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageCodec;
    
    /**
     * 二进制和String的相互转换
     * @author 程就人生
     * @date 2019年12月3日
     */
    public class StringBinaryCodec2 extends ByteToMessageCodec<String>{
    
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            System.out.println("出站数据:"+msg);
            
            // 将对象转换为byte
            byte[] bytes = msg.getBytes();
    
            // 读取消息的长度
            int length = bytes.length;
    
            // 先将消息长度写入,也就是消息头
            out.writeInt(length);
            
            // 消息体中包含我们要发送的数据
            out.writeBytes(msg.getBytes()); 
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 标记一下当前的readIndex的位置
            in.markReaderIndex();
            // 判断包头长度
            if (in.readableBytes() < 4) {// 不够包头
                return;
            }           
            // 读取传送过来的消息的长度。
            int length = in.readInt();
            // 长度如果小于0
            if (length < 0) {// 非法数据,关闭连接
                ctx.close();
            }
            // 读到的消息体长度如果小于传送过来的消息长度
            if (length > in.readableBytes()) {
                // 重置读取位置
                in.resetReaderIndex();
                return;
            }
            // 直接缓冲,读取字符串
            byte[] array = new byte[length];
            in.readBytes(array, 0, length);
            
            String str = new String(array,"utf-8");
            out.add(str);
            System.out.println("入站数据:"+ str);
        }   
    }
    

    测试一下,效果一样的,真是棒极了。

    下面,再来写一个对象转对象的编解码类,其中的一个对象需要借助于Google的protobuf进行生成,不会的可以翻看以前的文章。
    这里就直接继承Netty提供的MessageToMessageCodec编解码合成类吧,来看看代码;

    import java.util.List;
    
    import com.example.demo.test.PersonMsg.person;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageCodec;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    /**
     * 对象到对象的编码示例
     * @author 程就人生
     * @date 2019年12月3日
     */
    public class ObjectsCodec extends MessageToMessageCodec<BinaryWebSocketFrame, PersonMsg.person>{
    
        /**
         * 出站编码
         */
        @Override
        protected void encode(ChannelHandlerContext ctx, person msg, List<Object> out) throws Exception {
            ByteBuf buf = Unpooled.buffer();
            buf.writeBytes(msg.toByteArray());      
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buf);      
            out.add(binaryWebSocketFrame);
            System.out.print("出站数据:" + msg.toString());
        }
    
        /**
         * 入站解码
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {
            PersonMsg.person person = PersonMsg.person.parseFrom(msg.content().array());
            System.out.print("入站数据“" + person.toString());
            out.add(person);
        }   
    }
    

    最后测试类:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    
    /**
     * 测试用例
     * @author 程就人生
     * @date 2019年12月3日
     */
    public class TestCode2 {
    
        @SuppressWarnings("rawtypes")
        @Test
        public void test(){
            //通道初始化
            ChannelInitializer channelInitializer =  new ChannelInitializer<EmbeddedChannel>(){
                @Override
                protected void initChannel(EmbeddedChannel ch) throws Exception {
                    //编解码合二为一,用一个代替两个
                    ch.pipeline().addLast(new ObjectsCodec());
                }           
            };      
            
            //内置的测试类
            EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
            //入站模拟
            PersonMsg.person.Builder rb;            
            for(int i=0;i<100;i++){
                rb = PersonMsg.person.newBuilder().setUid(i+"").setUsername("aaaa"+i);
                byte[] bytes = rb.build().toByteArray();
                ByteBuf buf = Unpooled.buffer(bytes.length);
                buf.writeBytes(bytes);
                
                BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buf);      
                channel.writeOneInbound(binaryWebSocketFrame);          
            }
            
            //出站模拟测试        
            for(int j=0;j<100;j++){
                rb = PersonMsg.person.newBuilder().setUid(j+"").setUsername("bbbb"+j);
                channel.writeAndFlush(rb.build());
            }           
        }
    }
    

    测试结果如下:

    入站数据“uid: "0"
    username: "aaaa0"
    入站数据“uid: "1"
    username: "aaaa1"
    入站数据“uid: "2"
    username: "aaaa2"
    入站数据“uid: "3"
    username: "aaaa3"
    入站数据“uid: "4"
    username: "aaaa4"
    
    出站数据:uid: "0"
    username: "bbbb0"
    出站数据:uid: "1"
    username: "bbbb1"
    出站数据:uid: "2"
    username: "bbbb2"
    出站数据:uid: "3"
    username: "bbbb3"
    出站数据:uid: "4"
    username: "bbbb4"
    

    总结
    不管是服务器端还是客户端,数据接收不到,那就只有一个原因,服务器端和客户端的编解码不一样

    相关文章

      网友评论

        本文标题:Netty中的编解码,至少知道这两种

        本文链接:https://www.haomeiwen.com/subject/prjhgctx.html