美文网首页
第3讲 编解码

第3讲 编解码

作者: 农民工进城 | 来源:发表于2018-05-06 20:04 被阅读0次

    本章要点:

    • java序列化缺点
    • 业界流行的集中编解码框架介绍

    3.1 java序列化缺点

    • 无法跨越语言,是java序列化最致命的缺点;
    • 序列化后的码流太大
    • 序列化性能太低

    3.2 主流编辑码框架

    3.2.1 Google Protobuf编解码

    Protobuf在业界非常流行,Protobuf具有以下有点:

    • 产品成熟
    • 跨语言、支持多种语言
    • 编码后消息更小,便与存储和传输
    • 编解码性能高、
    • 支持定义可选和必选字段
      Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于XML等传统的序列化工具,它更小、更快、更简单。
      https://github.com/google/protobuf/releases
      下载protof工具,protoc-3.5.1-win32.zip,并解压,会看到protoc.exe工具。

    User.proto的文件内容如下:

    package com.bj58.wuxian.protobuf;
    
    option java_outer_classname = "UserProto"; 
    message User{
        required int32 id=1;
        required string username=2;
        required string password=3;
        enum Sex{
         nan=1;
         nv=2;
        }
        required Sex sex=4;
    }
    

    执行如下命令:

    D:/develop/protoc-3.5.1-win32/bin/protoc.exe -I=./proto --java_out=D:/develop/protoc-3.5.1-win32/bin/proto/ ./proto/User.proto
    

    生成了一个UserProto.java文件。
    protoc.exe -I=proto的输入目录 --java_out=java类输出目录 proto的输入目录包括包括proto文件
    此时将UserProto.java文件拷贝到IDE中会报错,那是因为缺少包依赖:

            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.5.1</version>
            </dependency>
    

    protobuf序列化传输代码实例:
    UserServer代码:

    package com.bj58.wuxian.netty.codec.protobuf;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    import com.bj58.wuxian.protobuf.UserProto;
    
    public class UserServer {
        
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup=new NioEventLoopGroup(); 
            EventLoopGroup workGroup=new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap=new ServerBootstrap();
                bootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用于半包处理
                        ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));//ProtobufDecoder参数是所要解码的目标类
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        ch.pipeline().addLast(new ProtobufEncoder());
                        ch.pipeline().addLast(new UserServerHandler());
                    }
                });
                
                ChannelFuture f=bootstrap.bind(8888).sync();
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    
    

    UserServerHandler 代码:

    package com.bj58.wuxian.netty.codec.protobuf;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import com.bj58.wuxian.protobuf.UserProto;
    import com.bj58.wuxian.protobuf.UserProto.User.Sex;
    
    public class UserServerHandler  extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("**********server channelActive*************");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            
            UserProto.User user =(UserProto.User)msg;
            System.out.println("request:"+msg);
            UserProto.User.Builder builder = UserProto.User.newBuilder();
            builder.setId(123);
            builder.setSex(Sex.nv);
            builder.setPassword("45678");
            if("zhaoshichao".equalsIgnoreCase(user.getUsername())){
                builder.setUsername("shangjing");
            }else{
                builder.setUsername("guanggunhan");
            }
            UserProto.User wife = builder.build();
            ctx.writeAndFlush(wife);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    UserClient 代码:

    package com.bj58.wuxian.netty.codec.protobuf;
    
    import com.bj58.wuxian.protobuf.UserProto;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    public class UserClient {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup group=new NioEventLoopGroup();
            
            try {
                Bootstrap bootstrap=new Bootstrap();
                bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        ch.pipeline().addLast(new ProtobufEncoder());
                        ch.pipeline().addLast(new UserClientHandler());
                    }
                });
                
                ChannelFuture f=bootstrap.connect("127.0.0.1",8888).sync();
                f.channel().closeFuture().sync();
                
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                group.shutdownGracefully();
            }
        }
    }
    
    

    UserClientHandler 代码:

    package com.bj58.wuxian.netty.codec.protobuf;
    
    import com.bj58.wuxian.protobuf.UserProto;
    import com.bj58.wuxian.protobuf.UserProto.User.Sex;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class UserClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for(int i=0;i<=5;i++){
                UserProto.User.Builder builder = UserProto.User.newBuilder();
                builder.setId(123);
                builder.setSex(Sex.nan);
                builder.setPassword("45678");
                if(i%2==0){
                    builder.setUsername("zhaoshichao");
                }else{
                    builder.setUsername("zhaoshichao"+i);
                }
                
                UserProto.User wife = builder.build();
                ctx.writeAndFlush(wife);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("response:"+(UserProto.User)msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    }
    
    
    3.2.1 Message Pack编解码

    MessagePack特点如下:

    • 编解码高效,性能高
    • 序列化之后的码流小
    • 支持跨语言

    添加pom依赖:

            <dependency>
                <groupId>org.msgpack</groupId>
                <artifactId>msgpack</artifactId>
                <version>0.6.12</version>
            </dependency>
    

    自定义编码器:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    import org.msgpack.MessagePack;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class MsgPackEncoder extends MessageToByteEncoder<Object> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
                throws Exception {
            MessagePack msfPack=new MessagePack();
            byte[] bytes=msfPack.write(msg);
            out.writeBytes(bytes);
        }
    }
    
    

    自定义解码器:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    import java.util.List;
    import org.msgpack.MessagePack;
    import com.bj58.wuxian.msgpack.model.User;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    
    public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
                List<Object> out) throws Exception {
            byte[] bytes=new byte[msg.readableBytes()];
            msg.readBytes(bytes);
            MessagePack messagePack=new MessagePack();
            out.add(messagePack.read(bytes,User.class));
        }
    }
    
    

    UserServer代码:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    public class UserServer {
        public static void main(String[] argsStrings) throws Exception {
            // 配置服务端NIO线程组(boss线程、worker线程)
            EventLoopGroup bGroup = new NioEventLoopGroup();
            EventLoopGroup wGroup = new NioEventLoopGroup();
            // 创建启动辅助类
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bGroup, wGroup).channel(NioServerSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            // 添加对象系列化编解码器,同时提供粘包拆包支持
                            channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                            channel.pipeline().addLast("解码器", new MsgPackDecoder());
                            channel.pipeline().addLast(new LengthFieldPrepender(2));
                            channel.pipeline().addLast("编码器", new MsgPackEncoder());
                            channel.pipeline().addLast(new UserServerHandler());
                        }
    
                    });
    
            try {
                // 监听本地端口,同步等待监听结果
                ChannelFuture future = bootstrap.bind(8888).sync();
                // 等待服务端监听端口关闭,优雅退出
                future.channel().closeFuture().sync();
            } finally {
                bGroup.shutdownGracefully();
                wGroup.shutdownGracefully();
            }
    
        }
    }
    
    

    ServerHanler代码:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    import com.bj58.wuxian.msgpack.model.User;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class UserServerHandler extends ChannelInboundHandlerAdapter  {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("**********server channelActive**********");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            User user=(User)msg;
            User wife=new User();
            if("zhaoshichao".equals(user.getUsername())){
                wife.setPassword("1234");
                wife.setUsername("shangjing");;
            }else{
                wife.setPassword("1234");
                wife.setUsername("others");
            }
            
            ctx.writeAndFlush(wife);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    

    User客户端:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    public class UserClient {
        public static void  main(String [] argsStrings) throws Exception {  
            //配置客户端端NIO线程组  
            EventLoopGroup bGroup = new NioEventLoopGroup();  
            //创建客户端启动辅助类  
            Bootstrap bootstrap = new Bootstrap();  
            bootstrap.group(bGroup).  
                      channel(NioSocketChannel.class).  
                      option(ChannelOption.TCP_NODELAY, true).  
                      option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).  
                      handler(new ChannelInitializer<SocketChannel>() {  
                            @Override  
                            protected void initChannel(SocketChannel channel) throws Exception {  
                                //添加对象系列化编解码器,同时提供粘包拆包支持  
                                channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                                channel.pipeline().addLast("解码器", new MsgPackDecoder());
                                channel.pipeline().addLast(new LengthFieldPrepender(2));
                                channel.pipeline().addLast("编码器", new MsgPackEncoder());
                                channel.pipeline().addLast(new UserClientHandler());
                            }  
                               
                        });  
              
            //发起异步连接  
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();  
            try {  
                //等待客户端链路关闭  
                future.channel().closeFuture().sync();  
                } finally {  
                //优雅退出,释放资源  
                bGroup.shutdownGracefully();  
            }  
        }  
    }
    
    

    ClientHandler代码:

    package com.bj58.wuxian.netty.codec.msgpack;
    
    import com.bj58.wuxian.msgpack.model.User;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class UserClientHandler extends ChannelInboundHandlerAdapter{
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for(int i=0;i<=5;i++){
                User user=new User();
                user.setId(123);
                user.setPassword("45678");
                if(i%2==0){
                    user.setUsername("zhaoshichao");
                }else{
                    user.setUsername("zhaoshichao"+i);
                }
                
                ctx.writeAndFlush(user);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("response:"+(User)msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:第3讲 编解码

          本文链接:https://www.haomeiwen.com/subject/ekzurftx.html