美文网首页
netty解决粘包的4种方案

netty解决粘包的4种方案

作者: 念䋛 | 来源:发表于2021-09-23 13:01 被阅读0次

1. DelimiterBasedFrameDecoder使用特殊字符作为分割,如果使用的话,注意特殊字符不能在真正要传输的内容中出现,

客户端

public class NettyClient {
    public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup ();
        try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
            bootstrap.group (group) //设置线程组
                    .channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler (new ChannelInitializer<SocketChannel> () {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //字符串编码器
                            //channel.pipeline ().addLast (new StringEncoder ());
                            //long编码器
//                            channel.pipeline ().addLast (new OutEncoder ());
                            ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
                            channel.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
                            //加入处理器
                            channel.pipeline ().addLast (new NettyClientHandler ());
                        }
                    });
            System.out.println ("netty client start");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
            //对关闭通道进行监听
            channelFuture.channel ().closeFuture ().sync ();
        } finally {
            group.shutdownGracefully ();
        }
    }
}

客户端的handler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println ("发送消息");
        for (int i = 0; i < 2; i++) {
            //拆包用分隔符
            ByteBuf buf = Unpooled.copiedBuffer ("HelloServer$_$", CharsetUtil.UTF_8);
            ctx.writeAndFlush (buf);
        }
        ctx.fireChannelActive ();
    }
    //当通道有读取事件时会触发,即服务端发送数据给客户端
    //msg就额是接受的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = ( ByteBuf ) msg;
        //因为已经使用了StringDecoder传过来的已经string类型
        System.out.println ("收到服务端的消息:" + buf);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace ();
        ctx.close ();
    }
}

服务端

 public static void main(String[] args) throws Exception {

        //创建两个线程组bossGroup和workerGroup,
        //NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup (1);
        EventLoopGroup workerGroup = new NioEventLoopGroup (8);
        try {
            //创建服务器端的引导对象
            ServerBootstrap bootstrap = new ServerBootstrap ();
            //将两个线程组放进引导对象
            bootstrap.group (bossGroup, workerGroup)
                    //NioServerSocketChannel服务端channel
                    .channel (NioServerSocketChannel.class)
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option (ChannelOption.SO_BACKLOG, 1024)
                    .childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //每一个链接过来(每一次创建channel)都会执行这个方法
                            System.out.println ("初始化pipeline");
                            //编码,将String转码为ByteBuf
                            ch.pipeline ().addLast ("encode", new StringEncoder ());
                            //用分割符处理粘包问题,分隔符可以是多个,
                            // 1024代表1024个字节内还没有找到分隔符抛出异常,TooLongFrameException,
                            //如果后续的Handler重写了exceptionCaught方法就会调用exceptionCaught方法
                            //DelimiterBasedFrameDecoder分割是转义成ByteBuf才分割的,所以如果添加了StringDecoder的话,
                            // 要把StringDecoder放在DelimiterBasedFrameDecoder的后面
                            ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
                            ch.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
                            //解码,将ByteBuf解码为String
                            ch.pipeline ().addLast ("decode", new StringDecoder ());
                            //handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
                            ch.pipeline ().addLast (new NettyServerHandler ());
                        }
                    });
            System.out.println ("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            //我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
            ChannelFuture cf = bootstrap.bind (9000);
            //给cf注册监听器,监听我们关心的事件
            cf.addListener (new ChannelFutureListener () {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess ()) {
                        //这里只是简单的打印,
                        System.out.println ("监听端口9000成功");
                    } else {
                        System.out.println ("监听端口9000失败");
                    }
                }
            });
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel ().closeFuture ().sync ();
        } finally {
            bossGroup.shutdownGracefully ();
            workerGroup.shutdownGracefully ();
        }
    }
}

服务端的handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
        //如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
        ctx.fireChannelActive ();
    }

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         //使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
         System.out.println ("客户端发送消息是:" +msg);
        //这是返回给客户端消息,如果使用StringEncoder,直接发String类型就可以,如果没有使用就先转为ByteBuf
        ByteBuf buf1 = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
        Channel channel = ctx.channel ();
        channel.writeAndFlush (buf1);
    }

    /**
     * 数据读取完毕处理方法,这个方法个人认为不太好用,
     *
     * @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
     * System.out.println ("服务端接受消息结束");
     * ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
     * ctx.writeAndFlush (buf);
     * }
     */
    @Override
    //处理异常, 一般是需要关闭通道
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println (cause.getMessage ());
        ctx.close ();
    }
}

2. FixedLengthFrameDecoder定长,这个也不常用

//每一个链接过来(每一次创建channel)都会执行这个方法

System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//固定接受10个字节,如果超过这个字节数,超出的部分存在tcp的缓存中,等待下一次传输,超出的部分不会丢弃掉
ch.pipeline ().addLast (new FixedLengthFrameDecoder (10));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());

客户端发送的消息为HelloServer11个字节,可以看到超出的r会放到下一次的消息中,
一个汉字为3个字节,所以很难在实际项目中使用
客户端发送消息是:HelloServe
客户端发送消息是:rHelloServ

3. LineBasedFrameDecoder 换行符在发送消息加上换行符 \n

System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//如果字节超过1024还没有找到换行符则抛出异常
ch.pipeline ().addLast (new LineBasedFrameDecoder (1024));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());

4. LengthFieldBasedFrameDecoder和LengthFieldPrepender

LengthFieldPrepender编码器,将发送消息的前面加上请求体的字节长度
LengthFieldBasedFrameDecoder获取请求头的长度,根据长度获取请求体的信息
个人认为这个比较常用
客户端

public class NettyClient {
    public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup ();
        try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
            bootstrap.group (group) //设置线程组
                    .channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler (new ChannelInitializer<SocketChannel> () {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //规定标记消息提长度所占字节数
                            channel.pipeline().addLast(new LengthFieldPrepender (2));
                            channel.pipeline ().addLast (new NettyClientHandler ());
                        }
                    });
            System.out.println ("netty client start");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
            //对关闭通道进行监听
            channelFuture.channel ().closeFuture ().sync ();
        } finally {
            group.shutdownGracefully ();
        }
    }
}

客户端handler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println ("发送消息");
        for (int i = 0; i < 2; i++) {
            //拆包用分隔符
            ByteBuf buf = Unpooled.copiedBuffer ("HelloServer"+i, CharsetUtil.UTF_8);
            ctx.writeAndFlush (buf);
        }
        ctx.fireChannelActive ();
    }
    //当通道有读取事件时会触发,即服务端发送数据给客户端
    //msg就额是接受的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = ( ByteBuf ) msg;
        //因为已经使用了StringDecoder传过来的已经string类型
        System.out.println ("收到服务端的消息:" + buf);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace ();
        ctx.close ();
    }
}

服务端

public class NettyServer {

    public static void main(String[] args) throws Exception {

        //创建两个线程组bossGroup和workerGroup,
        //NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup (1);
        EventLoopGroup workerGroup = new NioEventLoopGroup (8);
        try {
            //创建服务器端的引导对象
            ServerBootstrap bootstrap = new ServerBootstrap ();
            //将两个线程组放进引导对象
            bootstrap.group (bossGroup, workerGroup)
                    //NioServerSocketChannel服务端channel
                    .channel (NioServerSocketChannel.class)
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option (ChannelOption.SO_BACKLOG, 1024)
                    .childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //每一个链接过来(每一次创建channel)都会执行这个方法
                            System.out.println ("初始化pipeline");

                            // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
                            // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
                            //maxFrameLength:指定了每个包所能传递的最大数据包大小;
                            //lengthFieldOffset:指定了长度字段在字节码中的偏移量;
                            //lengthFieldLength:指定了长度字段所占用的字节长度;
                            //lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
                            //initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
                            //1024最大数据包长度,包括长度所占的字节数
                            //0,因为第一字符开始就是长度
                            //2消息体长度所占字节数
                            //0因为第一个字符就是长度
                            //2去掉长度所占字节数,获取剩下的消息体
                            //注2就是下面LengthFieldPrepender中的规定长度所占的字节数
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024, 0, 2, 0, 2));
                            // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
                            ch.pipeline().addLast(new LengthFieldPrepender (2));
                            //handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
                            //StringDecoder要放到LengthFieldBasedFrameDecoder后面,对得到消息体的ByteBuf转码为String类型,个人认为这个比较常用
                            ch.pipeline ().addLast ("encode", new StringDecoder ());
                            ch.pipeline ().addLast (new NettyServerHandler ());
                        }
                    });
            System.out.println ("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            //我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
            ChannelFuture cf = bootstrap.bind (9000);
            //给cf注册监听器,监听我们关心的事件
            cf.addListener (new ChannelFutureListener () {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess ()) {
                        //这里只是简单的打印,
                        System.out.println ("监听端口9000成功");
                    } else {
                        System.out.println ("监听端口9000失败");
                    }
                }
            });
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel ().closeFuture ().sync ();
        } finally {
            bossGroup.shutdownGracefully ();
            workerGroup.shutdownGracefully ();
        }
    }
}

服务端handler


/**
 * 继承了ChannelInboundHandlerAdapter 属于Inbound,输入的处理器
 */
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
        //如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
        ctx.fireChannelActive ();
    }

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         //使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
         System.out.println ("客户端发送消息是:" +msg);
    }

    /**
     * 数据读取完毕处理方法,这个方法个人认为不太好用,
     *
     * @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
     * System.out.println ("服务端接受消息结束");
     * ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
     * ctx.writeAndFlush (buf);
     * }
     */
    @Override
    //处理异常, 一般是需要关闭通道
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println (cause.getMessage ());
        ctx.close ();
    }
}

相关文章

网友评论

      本文标题:netty解决粘包的4种方案

      本文链接:https://www.haomeiwen.com/subject/ktxogltx.html