美文网首页首页投稿(暂停使用,暂停投稿)今日看点程序员
netty实战一之TCP粘包问题和不同编解码技术的应用

netty实战一之TCP粘包问题和不同编解码技术的应用

作者: 谜碌小孩 | 来源:发表于2016-10-30 20:04 被阅读0次

    TCP粘包拆包问题

    1. LineBasedFrameDecoder+StringDecoder 换行符为结束标志
    2. DelimiterBasedFrameDecoder + StringDecoder 分隔符作为结束标志
    3. FixedLengthFrameDecoder + StringDecoder 定长

    代码示例(netty权威指南修改的例子,不多解释)

    EchoServerHandler.class

    public class EchoServerHandler extends ChannelInboundHandlerAdapter{
        int counter = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //        String body = (String) msg;
    //        System.out.println("This is " + ++counter + " times received client:[" + body + "]");
    //        body += "$_";
    //        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    //        ctx.writeAndFlush(echo);
            System.out.println("Received client : [" + msg + "]");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    EchoServer.class

    public class EchoServer {
        public void bind(int port) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    //                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                                socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
                                socketChannel.pipeline().addLast(new StringDecoder());
                                socketChannel.pipeline().addLast(new EchoServerHandler());
                            }
                        });
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = 8080;
            if(args != null && args.length > 0){
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
    //                e.printStackTrace();
                }
            }
            new EchoServer().bind(port);
        }
    }
    

    EchoClientHandler.class

    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
        private int counter;
    
        static final String ECHO_REQ = "Hi,welcome to netty.$_";
    
        public EchoClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for( int i = 0 ; i < 10 ; i++){
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("This is " + ++counter + " times received client:[" + msg + "]");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    }
    

    EchoClient.class

    public class EchoClient {
        public void connect(int port,String host) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            try {
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                                socketChannel.pipeline().addLast(new StringDecoder());
                                socketChannel.pipeline().addLast(new EchoClientHandler());
                            }
                        });
                ChannelFuture future = b.connect(host,port).sync();
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = 8080;
            if(args != null && args.length > 0){
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
    //                e.printStackTrace();
                }
            }
            new EchoClient().connect(port,"127.0.0.1");
        }
    }
    

    编解码技术

    Java序列化

    1. 无法跨语言
    2. 序列化后流太大
    3. 序列化性能太低

    Google的Protobuf

    1. 语言无关
    2. 高效
    3. 结构化数据存储格式(XML JSON等)
    4. 官方支持Java C++ Python

    Facebook的Thrift

    • 优缺点
    1. 适用于静态的数据交换,需要首先确定好它的数据结构;当数据结构发生变化时,必须重新编辑IDL文件,生成代码和编译
    2. 支持多种程序语言
    • 组成
    1. 语言系统以及IDL编译器
    2. TProtocal:RPC协议层,可以选择多种不同的对象序列化方式
    3. TTransport:RPC的传输层
    4. TProcesser:协议层和用户服务之间的纽带,负责调用服务实现的接口
    5. TServer:聚合TProtocal、TTransport、TProcesser等对象

    netty使用不同解码方式

    在netty中使用java序列化

    public class SubscribeReq implements Serializable {
        private static final long serialVersionUID = 2394470753940199464L;
        private int subReqId;
        private String userName;
        private String productName;
        private String phoneNumber;
        private String address;
    
        public int getSubReqId() {
            return subReqId;
        }
    
        public void setSubReqId(int subReqId) {
            this.subReqId = subReqId;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        public String getProductName() {
            return productName;
        }
    
        public void setProductName(String productName) {
            this.productName = productName;
        }
    
        public String getPhoneNumber() {
            return phoneNumber;
        }
    
        public void setPhoneNumber(String phoneNumber) {
            this.phoneNumber = phoneNumber;
        }
    
        public String getAddress() {
            return address;
        }
    
        public void setAddress(String address) {
            this.address = address;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            SubscribeReq that = (SubscribeReq) o;
    
            if (subReqId != that.subReqId) return false;
            if (userName != null ? !userName.equals(that.userName) : that.userName != null) return false;
            if (productName != null ? !productName.equals(that.productName) : that.productName != null) return false;
            if (phoneNumber != null ? !phoneNumber.equals(that.phoneNumber) : that.phoneNumber != null) return false;
            return address != null ? address.equals(that.address) : that.address == null;
    
        }
    
        @Override
        public int hashCode() {
            int result = subReqId;
            result = 31 * result + (userName != null ? userName.hashCode() : 0);
            result = 31 * result + (productName != null ? productName.hashCode() : 0);
            result = 31 * result + (phoneNumber != null ? phoneNumber.hashCode() : 0);
            result = 31 * result + (address != null ? address.hashCode() : 0);
            return result;
        }
    
        @Override
        public String toString() {
            return "SubscribeReq{" +
                    "subReqId=" + subReqId +
                    ", userName='" + userName + '\'' +
                    ", productName='" + productName + '\'' +
                    ", phoneNumber='" + phoneNumber + '\'' +
                    ", address='" + address + '\'' +
                    '}';
        }
    }
    
    public class SubscribeResp implements Serializable {
        private static final long serialVersionUID = 1L;
        private int subReqId;
        private int respCode;
        private String desc;
    
        public int getSubReqId() {
            return subReqId;
        }
    
        public void setSubReqId(int subReqId) {
            this.subReqId = subReqId;
        }
    
        public int getRespCode() {
            return respCode;
        }
    
        public void setRespCode(int respCode) {
            this.respCode = respCode;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            SubscribeResp that = (SubscribeResp) o;
    
            if (subReqId != that.subReqId) return false;
            if (respCode != that.respCode) return false;
            return desc != null ? desc.equals(that.desc) : that.desc == null;
    
        }
    
        @Override
        public int hashCode() {
            int result = subReqId;
            result = 31 * result + respCode;
            result = 31 * result + (desc != null ? desc.hashCode() : 0);
            return result;
        }
    
        @Override
        public String toString() {
            return "SubscribeResp{" +
                    "subReqId=" + subReqId +
                    ", respCode=" + respCode +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }
    

    在netty中使用protobuf

    1. 安装protobuf

    下载解压编译 make && make check && sudo make install && sudo ldconfig

    1. 编写proto文件

      syntax="proto3";
      option java_package = "com.oneapm.netty.opendecode.protobuf";
      option java_outer_classname = "SubscribeReqProto";
      
      message SubscribeReq {
          int32 subReqId = 1;
          string userName = 2;
          string productName = 3;
          repeated string address = 4;
      }
      
      
      syntax="proto3";
      option java_package = "com.oneapm.netty.opendecode.protobuf";
      option java_outer_classname = "SubscribeRespProto";
      
      message SubscribeResp {
          int32 subReqId = 1;
           int32 respCode = 2;
          string desc = 3;
      }
      
    2. 自动生成class文件
      proto文件放在与/src/main/java平齐的/src/main/protobuf下

    cd src/main/protobuf
    /usr/local/bin/protoc --java_out=../java/ SubscribeReq.proto
    /usr/local/bin/protoc --java_out=../java/ SubscribeResp.proto

    1. 在server和client增加编码解码

    通用代码

    server端

    public class SubReqServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //        SubscribeReq req = (SubscribeReq) msg;
            SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
            if ("oneapm".equalsIgnoreCase(req.getUserName())) {
                System.out.println("Service accept client subscribe req:[" + req.toString() + "]");
                ctx.writeAndFlush(resp(req.getSubReqId()));
            }
        }
    
        /**
         * java serializable
         * @param subReqId
         * @return
         */
    //    private SubscribeResp resp(int subReqId) {
    //        SubscribeResp resp = new SubscribeResp();
    //        resp.setSubReqId(subReqId);
    //        resp.setRespCode(0);
    //        resp.setDesc("Welcome to China");
    //        return resp;
    //    }
    
        /**
         * protobuf
         * @param subReqId
         * @return
         */
        private SubscribeRespProto.SubscribeResp resp(int subReqId) {
            SubscribeRespProto.SubscribeResp.Builder resp = SubscribeRespProto.SubscribeResp.newBuilder();
            resp.setSubReqId(subReqId);
            resp.setRespCode(0);
            resp.setDesc("Welcome to China");
            return resp.build();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    public class SubReqServer {
        public void bind(int port) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //java Serializable
    //                            socketChannel.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    //                            socketChannel.pipeline().addLast(new ObjectEncoder());
    //                            socketChannel.pipeline().addLast(new SubReqServerHandler());
    
                                //protobuf
                                socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                                socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                                socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                                socketChannel.pipeline().addLast(new ProtobufEncoder());
                                socketChannel.pipeline().addLast(new SubReqServerHandler());
                            }
                        });
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = 8080;
            if(args != null && args.length > 0){
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
    //                e.printStackTrace();
                }
            }
            new SubReqServer().bind(port);
        }
    }
    

    client端

    public class SubReqClientHandler extends ChannelInboundHandlerAdapter {
        public SubReqClientHandler() {
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for( int i = 0 ; i < 10 ; i++){
                ctx.write(subReq(i));
            }
            ctx.flush();
        }
    
    //    private SubscribeReq subReq(int i){
    //        SubscribeReq req = new SubscribeReq();
    //        req.setAddress("宜昌市三峡大坝");
    //        req.setProductName("netty深入学习");
    //        req.setPhoneNumber("185********");
    //        req.setSubReqId(i);
    //        req.setUserName("oneapm");
    //        return req;
    //    }
    
        /**
         * protobuf
         * @param i
         * @return
         */
        private SubscribeReqProto.SubscribeReq subReq(int i){
            SubscribeReqProto.SubscribeReq.Builder req = SubscribeReqProto.SubscribeReq.newBuilder();
            List<String> address = new ArrayList<>();
            address.add("NanJing");
            address.add("BeiJing");
            req.addAllAddress(address);
            req.setProductName("netty study");
            req.setSubReqId(i);
            req.setUserName("oneapm");
            return req.build();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Receive server response: [" + msg + "]");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    public class SubReqClient {
        public void connect(int port,String host) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            try {
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
    //                            socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
    //                            socketChannel.pipeline().addLast(new ObjectEncoder());
    //                            socketChannel.pipeline().addLast(new SubReqClientHandler());
                                socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                                socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                                socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                                socketChannel.pipeline().addLast(new ProtobufEncoder());
                                socketChannel.pipeline().addLast(new SubReqClientHandler());
                            }
                        });
                ChannelFuture future = b.connect(host,port).sync();
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = 8080;
            if(args != null && args.length > 0){
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
    //                e.printStackTrace();
                }
            }
            new SubReqClient().connect(port,"127.0.0.1");
        }
    
    }
    

    facebook/nifty:Thrift on Netty

    facebook提供的构建在Thrift和Netty之上的封装https://github.com/facebook/nifty,简单来说,就是让我们的编码更加简单(脑残)

    项目github地址

    https://github.com/engimatic/effectivejava/tree/master/netty/src/main/java/com/oneapm/netty

    相关文章

      网友评论

        本文标题:netty实战一之TCP粘包问题和不同编解码技术的应用

        本文链接:https://www.haomeiwen.com/subject/ceeuuttx.html