Netty粘包与拆包以及解决方式

作者: kason_zhang | 来源:发表于2017-12-02 17:14 被阅读70次

本文内容都是学习Netty权威指南一书所写, 主要是为了加深对Netty内容的理解。

Netty 粘包与拆包

TCP协议是一个“流协议栈”, 像河流一样, 数据与数据之间没有分界线, 这种情况下就会出现一个完整的数据包被TCP拆分成多个包进行发送(拆包), 同时也有可能多个小的数据包被TCP封装成一个打的数据包发送(粘包)。这里以简单的代码举例。代码参考Netty权威指南一书。

粘包

Server端代码如下:

package com.kason.netty.chapter3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class TimeServer {

    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){
                //使用默认值
            }
        }
        try {
            new TimeServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void bind(int port) throws Exception{
        //配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理

        //bossGroup用于服务端接收客户端的链接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workGroup用于进行SocketChannel的网络读写
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            //ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //处理网络IO事件
                    .childHandler(new ChildChannelHandler());

            //绑定成功,, 同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务器端监听结束
            f.channel().closeFuture().sync();
        }finally {
            //优雅退出释放线程池资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }




    }
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new TimeServerHandler());
        }
    }
}
package com.kason.netty.chapter3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

public class TimeServerHandler extends ChannelHandlerAdapter {

    private int count = 0;
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        String body = new String(req, "UTF-8");
        body = body.substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("The time server receiver order: " + body + "====条数 " + ++count);

        //String currentTime = "Query Time Order".equalsIgnoreCase(body)? new Date(System.currentTimeMillis()).toString(): "Bad Order";
        String currentTime = System.currentTimeMillis() + "";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

客户端代码:

package com.kason.netty.chapter3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class TimeCLient {

    public void connect(int port, String host) throws Exception{
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //发起异步链接
            ChannelFuture f = b.connect(host, port).sync();

            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        int port = 8080;
        if(args != null && args.length >0 ){
            try {
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){

            }
        }

        new TimeCLient().connect(port,"127.0.0.1");
    }
}
package com.kason.netty.chapter3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimeClientHandler extends ChannelHandlerAdapter{

    private static final Logger logger = LoggerFactory.getLogger(TimeClientHandler.class.getName());

    private ByteBuf firstMessage;

    private byte[] req;
    public TimeClientHandler() {
        /*StringBuilder sb = new StringBuilder("");
        for(int i = 0; i < 1000; i++){
            sb.append("hello");
        }
        //byte[] req = "Query Time Order".getBytes();
        byte[] req = sb.toString().getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);*/
        req = ("Query Time Order" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for( int i =0; i< 100; i++){
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
        //ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        logger.info("Now is " + body);
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("READ COMPLETE");
    }
}

观看一下粘包现象:


image.png image.png

根据代码客户端明明发送了100次,并且每次都调用了flush, 可服务端这边显示的count确实6, 说明客户端发给服务端的包出现了粘包, 那么既然服务端收到了6次也就是会返回6次时间给客户端, 不过客户端确只显示一条, 说明服务端返回给客户端的数据也发生了粘包。

粘包解决

修改Server端的ChildChannelHandler

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            //socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new TimeServerHandler());
        }
    }

修改客户端的

.handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

Server端结果:


image.png

可知道Server确实收到了客户端的100条数据


image.png
可以看到Client端也收到了Server端的100条回复。

拆包

拆包代码大致如下:
CLient端代码:Client为了模拟大的发送包, 因此我发送的数据包是1000个hello组成的字符串, 这样在Server接收时,这个包就会被拆分掉多个小的数据包

package com.kason.netty.chapter3.chaibao;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class ChaiBaoClient {
    public void connect(int port, String host) throws Exception{
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChaiBaoCLientHandler());
                        }
                    });
            //发起异步链接
            ChannelFuture f = b.connect(host, port).sync();

            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        int port = 8080;
        if(args != null && args.length >0 ){
            try {
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){

            }
        }

        new ChaiBaoClient().connect(port,"127.0.0.1");
    }
}
class ChaiBaoCLientHandler extends ChannelHandlerAdapter {
    private ByteBuf message;

    public ChaiBaoCLientHandler() {
        //为了模拟拆包, 客户端将其发送的数据设置很大
        StringBuilder sb = new StringBuilder("");
        for(int i = 0; i< 1000; i++){
            sb.append("hello");
        }
        byte[] data = sb.toString().getBytes();
        message = Unpooled.buffer(data.length);
        message.writeBytes(data);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf readObj = (ByteBuf)msg;
        byte[] readResult = new byte[readObj.readableBytes()];
        readObj.readBytes(readResult);
        String reStr = new String(readResult, "UTF-8");
        System.out.println("client receive from server: " + reStr);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client read complete");
    }
}

Server端代码:

package com.kason.netty.chapter3.chaibao;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ChaiBaoServer {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){
                //使用默认值
            }
        }
        try {
            new ChaiBaoServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void bind(int port) throws Exception{
        //配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理

        //bossGroup用于服务端接收客户端的链接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workGroup用于进行SocketChannel的网络读写
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            //ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //处理网络IO事件
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            //绑定成功,, 同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务器端监听结束
            f.channel().closeFuture().sync();
        }finally {
            //优雅退出释放线程池资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }




    }

}
class ServerHandler extends ChannelHandlerAdapter{
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf readFromClient = (ByteBuf)msg;
        byte[] readbytes = new byte[readFromClient.readableBytes()];
        readFromClient.readBytes(readbytes);

        String readStr = new String(readbytes, "UTF-8");
        System.out.println("client send to server " + readStr);

        String currentTime = System.currentTimeMillis()+" ";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

Server端收到结果如下:


image.png

从图中可以知道Server应该只收到一次,然后图中却收到了5次,而且杂乱不堪。
CLient结果:


image.png
client理论上被Server返回了5次,却粘在一起了,所以此例子是Server端接收发生了拆包,客户端接收发生了粘包。

拆包解决

制定协议:这里仅仅是解释这种现象的解决方法, 并不会做深入的协议定制。比如最简单的方式我们定义前4位代表长度(因为一个整型int占四个字节),后面是真实字节数。
首先先写个int 转byte和byte转int的工具类, 方便调用。

package com.kason.netty.chapter3.chaibao;

public class utils {
    public static byte[] intToBytes( int value ) {
        byte[] src = new byte[4];
        src[3] =  (byte) ((value>>24) & 0xFF);
        src[2] =  (byte) ((value>>16) & 0xFF);
        src[1] =  (byte) ((value>>8) & 0xFF);
        src[0] =  (byte) (value & 0xFF);
        return src;
    }
    public static int bytesToInt(byte[] src, int offset) {
        int value;
        value = (int) ((src[offset] & 0xFF)
                | ((src[offset+1] & 0xFF)<<8)
                | ((src[offset+2] & 0xFF)<<16)
                | ((src[offset+3] & 0xFF)<<24));
        return value;
    }
}

当我们制定好规则之后就是编写解码器

package com.kason.netty.chapter3.chaibao;

import java.util.List;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;


public class MsgDecoder extends ByteToMessageDecoder {

    public static final int HEAD_LENGTH = 4;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("shoudao meaasge, decode");

        if (in.readableBytes() < HEAD_LENGTH) {  //这个HEAD_LENGTH是我们用于表示头长度的字节数。  由于Encoder中我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4.
            return;
        }
        in.markReaderIndex();                  //我们标记一下当前的readIndex的位置
        byte[] da = new byte[4];
        in.readBytes(da);
        int dataLength = utils.bytesToInt(da,0);
                //int dataLength = in.readInt();       // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
        System.out.println("lenght " + dataLength);
        if (dataLength < 0) { // 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
            ctx.close();
        }

        if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
            in.resetReaderIndex();
            return;
        }

        byte[] body = new byte[dataLength];  //传输正常
        in.readBytes(body);
        String re = new String(body,"UTF-8");
        System.out.println("解码客户端的结果: " + re);
        out.add(re);
    }

}

客户端代码不变, 服务端代码修改如下

package com.kason.netty.chapter3.chaibao;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class ChaiBaoServer {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){
                //使用默认值
            }
        }
        try {
            new ChaiBaoServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void bind(int port) throws Exception{
        //配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理

        //bossGroup用于服务端接收客户端的链接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workGroup用于进行SocketChannel的网络读写
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            //ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //处理网络IO事件
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new MsgDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

            //绑定成功,, 同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务器端监听结束
            f.channel().closeFuture().sync();
        }finally {
            //优雅退出释放线程池资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }




    }

}
class Decoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

    }
}

class ServerHandler extends ChannelHandlerAdapter{

    //整行int占四个字节

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /*ByteBuf readFromClient = (ByteBuf)msg;

        byte[] readbytes = new byte[readFromClient.readableBytes()];
        readFromClient.readBytes(readbytes);

        String readStr = new String(readbytes, "UTF-8");
        System.out.println("client send to server " + readStr);*/

        String readStr = (String)msg;
        System.out.println("client send to server " + readStr);
        String currentTime = System.currentTimeMillis()+" ";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

就是在事件处理ServerHandler之前先加入一个Decoder

.childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new MsgDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });

最终结果:Server端:


image.png

客户端:


image.png

可以看到客户端发送1000个hello也就是5000个字节,Server一次性收到, 而不是被拆成好几个, 同事只返回客户端一次response,所以客户端只收到一个时间。

相关文章

网友评论

    本文标题:Netty粘包与拆包以及解决方式

    本文链接:https://www.haomeiwen.com/subject/aqubbxtx.html