#Netty 中的编解码器的顺序问题:
>> 出站处理器(ChannelOutboundHandler):
其write操作是从ChannelPipeline的尾部到头部依次进行编码
>> 入站处理器(ChannelInboundHandler):
其read操作是从ChannelPipeline的头部到尾部依次进行解码
#假设 Server 与 Client 处的 ChannelHandler 都是:
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(4096, 0, 4, 0, 4)
.addLast(new StringDecoder())
.addLast(new LengthFieldPrepender(4))
.addLast(new StringEncoder())
则流程为:
>> client 发消息: 先通过StringEncoder编码, 再通过LengthFieldPrepender编码
>> server 收消息: 先通过LengthFieldBasedFrameDecoder解码, 再通过StringDecoder解码
>> server 回消息: 先通过StringEncoder编码, 再通过LengthFieldPrepender编码
>> client 收消息: 先通过LengthFieldBasedFrameDecoder解码, 再通过StringDecoder解码
1.概述
1.1 粘包拆包问题描述
传输层除了有TCP协议外还有UDP协议。
#UDP
不会发生粘包或拆包现象
因为UDP是基于报文发送的,从UDP的帧结构可以看出,
在UDP首部采用了16bit来指示UDP数据报文的长度,
因此在应用层能很好的将不同的数据报文区分开,从而避免粘包和拆包的问题。
#TCP
是基于字节流的,在基于流的传输里(如TCP/IP),接收到的数据会先被存储到一个socket接收缓冲里。
不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。
TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行化包的划分,
所以在业务上认为,一个完整的包可能会被TCP拆成多个包进行发送,
也有多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
粘包拆包问题.png
1.2 粘包拆包产生的原因
服务端和客户端都会造成粘包、半包问题,以下列出常见原因。
#服务端:
>> 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
>> 要发送的数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
>> 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
#接收端:
>> 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
1.3 粘包拆包问题的解决思路
TCP以流的方式进行数据传输,由于底层TCP无法理解上层的业务数据,
所以在底层是无法保证数据包不被拆分和重组的,
这个问题只能通过上层的应用协议栈设计来解决,上层的应用协议为了对消息进行区分。
#业界主流的解决方案归纳如下:
>> 客户端在发送数据包的时候,每个包都固定长度,比如1024个字节大小,
如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;
>> 客户端在每个包的末尾使用固定的分隔符,例如\r\n,
如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,
然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包;
>> 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,
只有在读取到足够长度的消息之后才算是读到了一个完整的消息;
>> 更复杂的应用层协议。
>> 自定义协议解决 (可参考 Dubbo)。
2.Netty中粘包拆包问题的解决方案
#1.FixedLengthFrameDecoder
对于使用固定长度的粘包和拆包场景,可以使用FixedLengthFrameDecoder,该解码一器会每次读取固定长度的消息,
如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。
其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。
这里需要注意的是,FixedLengthFrameDecoder只是一个解码器,Netty也只提供了一个解码器,
这是因为对于解码是需要等待下一个包的进行补全的,代码相对复杂,
对于编码器,用户可以自行编写,因为编码时只需要将不足指定长度的部分进行补全即可。
#2.LineBasedFrameDecoder与DelimiterBasedFrameDecoder
这俩适用于通过分隔符进行粘包和拆包问题的处理。
>> LineBasedFrameDecoder的作用主要是通过换行符,即\n或者\r\n对数据进行处理;
>> DelimiterBasedFrameDecoder的作用则是通过用户指定的分隔符对数据进行粘包和拆包处理。
这两个类都是解码器类,而对于数据的编码,
也即在每个数据包最后添加换行符或者指定分割符的部分需要用户自行进行处理。
#3.LengthFieldBasedFrameDecoder与LengthFieldPrepender
二者需要配合起来使用,其实本质上来讲,这两者一个是解码,一个是编码的关系。
它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。
##3.1 LengthFieldBasedFrameDecoder(基于数据包长度的拆包器):
会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;
>> maxFrameLength:指定了每个包所能传递的最大数据包大小;
>> lengthFieldOffset:指定了长度字段在字节码中的偏移量;
>> lengthFieldLength:指定了长度字段所占用的字节长度;
>> lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,
这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
>> initialBytesToStrip:对于长度字段在消息头中间的情况,
可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
将应用层数据包的长度,作为接收端应用层数据包的拆分依据。
按照应用层数据包的大小,拆包。
这个拆包器,有一个要求,就是应用层协议中包含数据包的长度。
>> failFast
true: 读取到长度域超过maxFrameLength,就抛出一个 TooLongFrameException。
false: 只有真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,
默认情况下设置为true,建议不要修改,否则可能会造成内存溢出
>> ByteOrder: 数据存储采用大端模式或小端模式
LengthFieldBasedFrameDecoder与LengthFieldPrepender需要配合起来使用,
其实本质上来讲,这两者一个是解码,一个是编码的关系。
它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。
LengthFieldBasedFrameDecoder会按照参数指定的包长度
偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;
LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,
这个字节数据中保存了当前消息体的整体字节数据长度。
数据在编码发送的时候,会指定当前这条消息的长度。
2.LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,
这个字节数据中保存了当前消息体的整体字节数据长度。
#自定义粘包与拆包器
>> 方案1:
通过继承LengthFieldBasedFrameDecoder和LengthFieldPrepender来实现粘包和拆包的处理。
>> 方案2:
通过继承MessageToByteEncoder和ByteToMessageDecoder来实现。
这里MessageToByteEncoder的作用是将响应数据编码为一个ByteBuf对象,
而ByteToMessageDecoder则是将接收到的ByteBuf数据转换为某个对象数据。
通过实现这两个抽象类,用户就可以达到实现自定义粘包和拆包处理的目的。
2.1 LineBasedFrameDecoder + StringDecoder (基于回车换行符解决粘包拆包问题)
package com.zy.netty.netty01;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
public class Server01 {
public static void main(String[] args) {
ServerBootstrap server = new ServerBootstrap();
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("server-bossGroup", true));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new DefaultThreadFactory("server-workerGroup", true));
try {
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
// 这里结合 LineBasedFrameDecoder + StringDecoder 实现粘包拆包的处理, 其中 LineBasedFrameDecoder 是基于 \n 或 \r\n 实现
// 这里需要注意: 单条消息不能超过给定的最大限度, 否则会抛出异常
.addLast("lineBasedFrameDecoder", new LineBasedFrameDecoder(1024))
.addLast("stringDecoder", new StringDecoder())
.addLast("serverhandler01", new ServerHandler01());
}
});
ChannelFuture channelFuture = server.bind("127.0.0.1", 8099).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server is error ..........", e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private static class ServerHandler01 extends SimpleChannelInboundHandler<Object> {
private LongAdder counter = new LongAdder();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
counter.increment();
String body = (String) msg;
System.out.println("server01 received msg: " + body + "; the counter is: " + counter.doubleValue());
String currentTime = "Query Time Order".equalsIgnoreCase(body) ? LocalDateTime.now().toString() : "Bad Order";
// 发送消息时, 每条消息结尾需要添加回车换行符
currentTime += System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(resp);
}
}
}
package com.zy.netty.netty01;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
public class Client01 {
public static void main(String[] args) {
Bootstrap client = new Bootstrap();
NioEventLoopGroup executors = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new DefaultThreadFactory("client-executor", true));
try {
client.group(executors)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
.addLast("clentLogginHandler", new LoggingHandler(LogLevel.DEBUG))
// 这里结合 LineBasedFrameDecoder + StringDecoder 实现粘包拆包的处理, 其中 LineBasedFrameDecoder 是基于 \n 或 \r\n 实现
// 这里需要注意: 单条消息不能超过给定的最大限度, 否则会抛出异常
.addLast("lineBasedFrameDecoder", new LineBasedFrameDecoder(1024))
.addLast("stringDecoder", new StringDecoder())
.addLast("clienthandler01", new ClientHandler01());
}
});
ChannelFuture channelFuture = client.connect("127.0.0.1", 8099).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client is error -----------", e);
} finally {
executors.shutdownGracefully();
}
}
private static class ClientHandler01 extends SimpleChannelInboundHandler<Object> {
private LongAdder counter = new LongAdder();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
counter.increment();
String body = (String) msg;
System.out.println("client receive msg: " + body + "; the counter is: " + counter.doubleValue());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String req = "Query Time Order";
// 发送消息时, 每条消息结尾需要添加回车换行符
req += System.getProperty("line.separator");
byte[] reqBytes = req.getBytes(StandardCharsets.UTF_8);
ByteBuf buf;
for (int i = 0; i < 50; i++) {
buf = Unpooled.buffer(reqBytes.length);
buf.writeBytes(reqBytes);
ctx.writeAndFlush(buf);
}
}
}
}
2.2 DelimiterBasedFrameDecoder + StringDecoder (自定义分隔符解决粘包拆包问题)
这里模拟一个简易聊天室
package com.zy.netty.netty02;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
public class Server02 {
private static final String DELIMITER_SEPARATOR = "$_";
public static void main(String[] args) {
ServerBootstrap server = new ServerBootstrap();
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("server-bossGroup", true));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new DefaultThreadFactory("server-workerGroup", true));
try {
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
// 这里结合 DelimiterBasedFrameDecoder + StringDecoder 实现粘包拆包的处理, 其中 DelimiterBasedFrameDecoder 需要自定义分隔符, 否则走默认值
// 这里需要注意: 单条消息不能超过给定的最大限度, 否则会抛出异常
.addLast("$_delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Unpooled.copiedBuffer(DELIMITER_SEPARATOR.getBytes(StandardCharsets.UTF_8))))
.addLast("stringDecoder", new StringDecoder())
.addLast("stringEncoder", new StringEncoder())
.addLast(new ServerHandler02());
}
});
ChannelFuture channelFuture = server.bind("127.0.0.1", 8090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server is error ..........", e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
private static class ServerHandler02 extends SimpleChannelInboundHandler<Object> {
// 保存所有与 服务端 建立好连接的 客户端的 channel 对象
// FIXME 分布式场景下, 是否可以存储到 redis 中 ?
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
channels.forEach(ch -> {
if (Objects.equals(ch, channel)) {
ch.writeAndFlush("self send msg: " + msg + DELIMITER_SEPARATOR);
} else {
ch.writeAndFlush("remoteAddress: " + channel.remoteAddress() + " send msg: " + msg + DELIMITER_SEPARATOR);
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 有客户端加入的事件
Channel channel = ctx.channel();
// 广播:
channels.add(channel);
channels.writeAndFlush("remoteAddress: " + channel.remoteAddress() + " join the server" + DELIMITER_SEPARATOR);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 有客户端离开的事件
Channel channel = ctx.channel();
// 广播:
channels.writeAndFlush("remoteAddress: " + channel.remoteAddress() + " leave the server" + DELIMITER_SEPARATOR);
channels.remove(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客户端处于活动状态
Channel channel = ctx.channel();
System.out.println("remoteAddress: " + channel.remoteAddress() + " is online.");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 客户端处于非活动状态
Channel channel = ctx.channel();
System.out.println("remoteAddress: " + channel.remoteAddress() + " is offline.");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
package com.zy.netty.netty02;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.LongAdder;
/**
* 这里多启动几个 client, 即可测试
*/
@Slf4j
public class Client02 {
private static final String DELIMITER_SEPARATOR = "$_";
public static void main(String[] args) {
Bootstrap client = new Bootstrap();
NioEventLoopGroup executors = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new DefaultThreadFactory("client-executor", true));
try {
client.group(executors)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
// 这里结合 DelimiterBasedFrameDecoder + StringDecoder 实现粘包拆包的处理, 其中 DelimiterBasedFrameDecoder 需要自定义分隔符, 否则走默认值
// 这里需要注意: 单条消息不能超过给定的最大限度, 否则会抛出异常
.addLast("$_delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Unpooled.copiedBuffer(DELIMITER_SEPARATOR.getBytes(StandardCharsets.UTF_8))))
.addLast("stringDecoder", new StringDecoder())
.addLast("stringEncoder", new StringEncoder())
.addLast("clientHandler02", new ClientHandler02());
}
});
ChannelFuture channelFuture = client.connect("127.0.0.1", 8090).sync();
// 死循环, 监听键盘输入, 客户端启动后(可以多启动几个客户端), 输入任意消息, 回车即可
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
channelFuture.channel().writeAndFlush(reader.readLine() + DELIMITER_SEPARATOR);
}
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("client is error -----------", e);
} finally {
executors.shutdownGracefully();
}
}
@ChannelHandler.Sharable
private static class ClientHandler02 extends SimpleChannelInboundHandler<Object> {
private LongAdder counter = new LongAdder();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
counter.increment();
String body = (String) msg;
System.out.println("client receive msg: " + body + "; the counter is: " + counter.doubleValue());
}
}
}
2.3 LengthFieldBasedFrameDecoder + LengthFieldPrepender
LengthFieldBasedFrameDecoder的解码过程.png LengthFieldPrepender的编码过程.png参考资料
https://my.oschina.net/zhangxufeng/blog/3023794 (netty中粘包拆包)
https://mp.weixin.qq.com/s/uNrq1EOl6frrX8fAg2-bHw
李林锋. (2015). Netty权威指南(第2版).
网友评论