美文网首页SpringBoot精选IT@程序员猿媛
Netty整合Protobuf编解码,并解决半包问题

Netty整合Protobuf编解码,并解决半包问题

作者: 程就人生 | 来源:发表于2019-10-23 19:03 被阅读0次

    Google的工具在平时的开发中也用过,但是听到Protobuf编解码框架时,还是忍不住去用一用,想知道它为什么在业界这么流行,而且是非常流行。

    在使用Google Protobuf编解码框架时,还需要搭建环境,这一点有点麻烦,想着后面可能带来的便利,这一步忍了;本文有两部分,第一部分Protobuf编解码java文件的生成;第二部分Protobuf编解码示例。

    第一部分,Protobuf编解码java文件的生成;

    首先,下载生成代码的压缩包,打开https://github.com/protocolbuffers/protobuf/releases页面,本次使用的压缩包是protoc-3.10.0-win32;

    下载截图

    第二步,对下载后的压缩包解压,根据需要编写proto文件,文件的后缀为.proto;

    syntax = "proto3";
    option java_outer_classname="ProtoMsgProto";
    message ProtoMsg{
        int32 subReqId = 1;
        string desc = 2;    
    }
    

    语法简要说明:

    1. syntax代表版本号,默认proto2,这里使用的是3,所以必须是proto3;
    2. package为生成的java加包名,防止message类冲突;
    3. option java_outer_classname是要输出的java文件的类名,不可与message后的名字重复;
    4. option java_package为每个message类加包名;
    5. option java_multiple_files默认false,表示生成java时的打包方式;
      false表示所有的消息都作为内部类,打包到一个外部类中;
      true代表一个消息打包成一个java类;

    把option java_multiple_files=true;加入到proto文件里,再次使用命令生成,可以看到生成的java文件有三个,也就是一个message对应三个java类文件。


    java_multiple_files为true生成的文件

    第三步,使用管理员的身份打开cmd窗口,进入解压的bin文件夹下,输入命令进行java文件的生成,

    C:\protoc-3.10.0-win32\bin>protoc.exe ProtoMsg.proto --java_out=./
    
    protoc生成的java文件

    在使用protoc.exe生成java文件的时候,我们使用的是3.10.0版本的工具,所以在项目里的pom中引入文件的时候,版本也需要与生成文件的相匹配。

    使用protoc.exe生成java文件的方法有多种,这里使用的是最简单便捷的方法,也不需要配置什么环境变量,能省一步是一步。还有一种方法,就是在maven里增加插件与对应的配置来生成java文件,个人感觉比较麻烦,还不如使用cmd命令行来的快。

    接下来,进行服务端客户端代码的开发,由于protobuf仅仅支持解码编码,并没有处理粘包/半包的问题,所以还需要借助其他的工具类进行粘包/半包的处理。

    第二部分,Protobuf编解码示例;

    首先,pom文件的引入;

    <!-- netty架包依赖 -->
            <dependency>
              <groupId>io.netty</groupId>
              <artifactId>netty-all</artifactId>
            </dependency>
    <!-- protobuf架包 -->
            <dependency>    
                <groupId>com.google.protobuf</groupId>  
                <artifactId>protobuf-java</artifactId>  
                <version>3.10.0</version>
            </dependency>
    

    第二,服务端的编码,ProtoMsgProto.java文件是通过protoc.exe生成的文件,这里就忽略不贴代码了;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    /**
     * netty整合protobuf,服务端模拟
     * @author 程就人生
     * @date 2019年10月22日
     */
    public class ProtobufServer {
    
        public void bind(int port){
            //开启线程组
            EventLoopGroup parentGroup = new NioEventLoopGroup();
            EventLoopGroup childGroup = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap =  new ServerBootstrap();
                serverBootstrap.group(parentGroup, childGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>(){
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //解码时,解决粘包/半包的问题
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        //对接收到的信息,使用protobuf进行解码
                        ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
                        //编码时,解决粘包/半包的问题
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        //对于要发送的信息,使用protobuf进行编码
                        ch.pipeline().addLast(new ProtobufEncoder());
                        ch.pipeline().addLast(new ProtobufServerHandler());
                    }
                    
                });
                //绑定端口,同步等待成功
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                //等待服务器端监听端口关闭
                channelFuture.channel().closeFuture().sync();   
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                //优雅退出,并释放线程
                parentGroup.shutdownGracefully();
                childGroup.shutdownGracefully();
            }
        }
        
        public static void main(String[] argo){
            //启动服务端
            ProtobufServer protobufServer = new ProtobufServer();
            protobufServer.bind(8080);
        }
    }
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ProtobufServerHandler extends ChannelInboundHandlerAdapter{
    
        private static Logger log = LoggerFactory.getLogger(ProtobufServerHandler.class);
        
        /**
         * 接收到消息时的处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            
            ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
            if(req.getDesc().contains("程就人生")){
                log.info("接收到的消息:" + req.getDesc());
                ctx.writeAndFlush(resp(req.getSubReqId()));
            }
        }
        
        /**
         * 构造返回的消息
         * @param subReqID
         * @return
         *
         */
        private ProtoMsgProto.ProtoMsg resp(int subReqID){
            ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
            builder.setSubReqId(subReqID);
            builder.setDesc("服务端已经顺利接收到客户端发送的消息,消息id:" + subReqID);     
            return builder.build();
        }
        
        
        /**
         * 异常时关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            //发生异常,关闭连接
            ctx.close();
        }
    }
    
    

    第三步,客户端的编码;

    package com.example.netty.client3;
    
    import com.example.netty.server3.ProtoMsgProto;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    /**
     * Netty整合protobuf客户端模拟
     * @author 程就人生
     * @date 2019年10月22日
     */
    public class ProtobufClient {
    
        public void connect(int port, String host){
            EventLoopGroup group = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>(){
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //解码时,解决粘包/半包的问题
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        //对接收到的信息,使用protobuf进行解码
                        ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
                        //编码时,解决粘包/半包的问题
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        //对于要发送的信息,使用protobuf进行编码
                        ch.pipeline().addLast(new ProtobufEncoder());
                        ch.pipeline().addLast(new ProtobufClientHandler());                 
                    }
                    
                });
                //发起异步连接操作
                ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
                //等待客户端链路关闭
                channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                //优雅退出,释放线程组
                group.shutdownGracefully();
            }
        }
        
        public static void main(String[] argo){
            //启动客户端,连接服务端
            ProtobufClient protobufClient = new ProtobufClient();
            protobufClient.connect(8080, "127.0.0.1");
        }
    }
    
    package com.example.netty.client3;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.example.netty.server3.ProtoMsgProto;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ProtobufClientHandler extends ChannelInboundHandlerAdapter{
    
        private static Logger log = LoggerFactory.getLogger(ProtobufClientHandler.class);
        
        /**
         * 
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for(int i=0;i<100;i++){
                ctx.writeAndFlush(req(i));
            }
        }
        
        /**
         * 构造返回的消息
         * @param subReqID
         * @return
         *
         */
        private ProtoMsgProto.ProtoMsg req(int subReqID){
            ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
            builder.setSubReqId(subReqID);
            builder.setDesc("程就人生"+subReqID);       
            return builder.build();
        }
         
        /**
         * 接收到消息时的处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
            ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
            log.info("接收到的消息:" + req.getDesc());        
        }
        
        /**
         * 异常时关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
        }
    }
    

    最后,测试;先启动服务端,再启动客户端,查看控制台的输出;

    服务端测试结果
    客户端测试结果

    protobuf之所以这么流行,原因有很多:第一,它是跨语言的,除了java还支持python、C++等语言;第二,编码后是以二进制的形式进行传输的,传输速度快。但是,它只负责编解码,粘包/半包的问题,还需通过其他的工具类进行解决。

    除了使用上文的ProtobufVarint32FrameDecoder类进行半包的处理,还可以使用LengthFieldBasedFrameDecoder工具类来解决;LengthFieldBasedFrameDecoder工具类的使用,在Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题中有说明;除此之外,还可以继承ByteToMessageDecoder类,自己处理半包消息的问题。

    Netty整合Protobuf框架进行编解码的简单操作,就这样告一段落了,更多的还需要继续看源码,加油吧。

    相关文章

      网友评论

        本文标题:Netty整合Protobuf编解码,并解决半包问题

        本文链接:https://www.haomeiwen.com/subject/ndkemctx.html