本文内容都是学习Netty权威指南一书所写, 主要是为了加深对Netty内容的理解。
Netty 粘包与拆包
TCP协议是一个“流协议栈”, 像河流一样, 数据与数据之间没有分界线, 这种情况下就会出现一个完整的数据包被TCP拆分成多个包进行发送(拆包), 同时也有可能多个小的数据包被TCP封装成一个打的数据包发送(粘包)。这里以简单的代码举例。代码参考Netty权威指南一书。
粘包
Server端代码如下:
package com.kason.netty.chapter3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try{
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
//使用默认值
}
}
try {
new TimeServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void bind(int port) throws Exception{
//配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理
//bossGroup用于服务端接收客户端的链接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workGroup用于进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
//ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//处理网络IO事件
.childHandler(new ChildChannelHandler());
//绑定成功,, 同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务器端监听结束
f.channel().closeFuture().sync();
}finally {
//优雅退出释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
}
package com.kason.netty.chapter3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
public class TimeServerHandler extends ChannelHandlerAdapter {
private int count = 0;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
body = body.substring(0, req.length - System.getProperty("line.separator").length());
System.out.println("The time server receiver order: " + body + "====条数 " + ++count);
//String currentTime = "Query Time Order".equalsIgnoreCase(body)? new Date(System.currentTimeMillis()).toString(): "Bad Order";
String currentTime = System.currentTimeMillis() + "";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
客户端代码:
package com.kason.netty.chapter3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeCLient {
public void connect(int port, String host) throws Exception{
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步链接
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port = 8080;
if(args != null && args.length >0 ){
try {
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
}
}
new TimeCLient().connect(port,"127.0.0.1");
}
}
package com.kason.netty.chapter3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TimeClientHandler extends ChannelHandlerAdapter{
private static final Logger logger = LoggerFactory.getLogger(TimeClientHandler.class.getName());
private ByteBuf firstMessage;
private byte[] req;
public TimeClientHandler() {
/*StringBuilder sb = new StringBuilder("");
for(int i = 0; i < 1000; i++){
sb.append("hello");
}
//byte[] req = "Query Time Order".getBytes();
byte[] req = sb.toString().getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);*/
req = ("Query Time Order" + System.getProperty("line.separator")).getBytes();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for( int i =0; i< 100; i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
//ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
logger.info("Now is " + body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("READ COMPLETE");
}
}
观看一下粘包现象:
image.png image.png
根据代码客户端明明发送了100次,并且每次都调用了flush, 可服务端这边显示的count确实6, 说明客户端发给服务端的包出现了粘包, 那么既然服务端收到了6次也就是会返回6次时间给客户端, 不过客户端确只显示一条, 说明服务端返回给客户端的数据也发生了粘包。
粘包解决
修改Server端的ChildChannelHandler
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
修改客户端的
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
Server端结果:
image.png
可知道Server确实收到了客户端的100条数据
image.png
可以看到Client端也收到了Server端的100条回复。
拆包
拆包代码大致如下:
CLient端代码:Client为了模拟大的发送包, 因此我发送的数据包是1000个hello组成的字符串, 这样在Server接收时,这个包就会被拆分掉多个小的数据包
package com.kason.netty.chapter3.chaibao;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class ChaiBaoClient {
public void connect(int port, String host) throws Exception{
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChaiBaoCLientHandler());
}
});
//发起异步链接
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port = 8080;
if(args != null && args.length >0 ){
try {
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
}
}
new ChaiBaoClient().connect(port,"127.0.0.1");
}
}
class ChaiBaoCLientHandler extends ChannelHandlerAdapter {
private ByteBuf message;
public ChaiBaoCLientHandler() {
//为了模拟拆包, 客户端将其发送的数据设置很大
StringBuilder sb = new StringBuilder("");
for(int i = 0; i< 1000; i++){
sb.append("hello");
}
byte[] data = sb.toString().getBytes();
message = Unpooled.buffer(data.length);
message.writeBytes(data);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf readObj = (ByteBuf)msg;
byte[] readResult = new byte[readObj.readableBytes()];
readObj.readBytes(readResult);
String reStr = new String(readResult, "UTF-8");
System.out.println("client receive from server: " + reStr);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("client read complete");
}
}
Server端代码:
package com.kason.netty.chapter3.chaibao;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ChaiBaoServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try{
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
//使用默认值
}
}
try {
new ChaiBaoServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void bind(int port) throws Exception{
//配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理
//bossGroup用于服务端接收客户端的链接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workGroup用于进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
//ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//处理网络IO事件
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
//绑定成功,, 同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务器端监听结束
f.channel().closeFuture().sync();
}finally {
//优雅退出释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class ServerHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf readFromClient = (ByteBuf)msg;
byte[] readbytes = new byte[readFromClient.readableBytes()];
readFromClient.readBytes(readbytes);
String readStr = new String(readbytes, "UTF-8");
System.out.println("client send to server " + readStr);
String currentTime = System.currentTimeMillis()+" ";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
Server端收到结果如下:
image.png
从图中可以知道Server应该只收到一次,然后图中却收到了5次,而且杂乱不堪。
CLient结果:
image.png
client理论上被Server返回了5次,却粘在一起了,所以此例子是Server端接收发生了拆包,客户端接收发生了粘包。
拆包解决
制定协议:这里仅仅是解释这种现象的解决方法, 并不会做深入的协议定制。比如最简单的方式我们定义前4位代表长度(因为一个整型int占四个字节),后面是真实字节数。
首先先写个int 转byte和byte转int的工具类, 方便调用。
package com.kason.netty.chapter3.chaibao;
public class utils {
public static byte[] intToBytes( int value ) {
byte[] src = new byte[4];
src[3] = (byte) ((value>>24) & 0xFF);
src[2] = (byte) ((value>>16) & 0xFF);
src[1] = (byte) ((value>>8) & 0xFF);
src[0] = (byte) (value & 0xFF);
return src;
}
public static int bytesToInt(byte[] src, int offset) {
int value;
value = (int) ((src[offset] & 0xFF)
| ((src[offset+1] & 0xFF)<<8)
| ((src[offset+2] & 0xFF)<<16)
| ((src[offset+3] & 0xFF)<<24));
return value;
}
}
当我们制定好规则之后就是编写解码器
package com.kason.netty.chapter3.chaibao;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class MsgDecoder extends ByteToMessageDecoder {
public static final int HEAD_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("shoudao meaasge, decode");
if (in.readableBytes() < HEAD_LENGTH) { //这个HEAD_LENGTH是我们用于表示头长度的字节数。 由于Encoder中我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4.
return;
}
in.markReaderIndex(); //我们标记一下当前的readIndex的位置
byte[] da = new byte[4];
in.readBytes(da);
int dataLength = utils.bytesToInt(da,0);
//int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
System.out.println("lenght " + dataLength);
if (dataLength < 0) { // 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
ctx.close();
}
if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
byte[] body = new byte[dataLength]; //传输正常
in.readBytes(body);
String re = new String(body,"UTF-8");
System.out.println("解码客户端的结果: " + re);
out.add(re);
}
}
客户端代码不变, 服务端代码修改如下
package com.kason.netty.chapter3.chaibao;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class ChaiBaoServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try{
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
//使用默认值
}
}
try {
new ChaiBaoServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void bind(int port) throws Exception{
//配置服务端的NIO线程组, NioEventLoopGroup是专门用于网络事件的处理
//bossGroup用于服务端接收客户端的链接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workGroup用于进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
//ServerBootstrap是专门用于启动NIO服务端的辅助启动类;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//处理网络IO事件
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MsgDecoder());
socketChannel.pipeline().addLast(new ServerHandler());
}
});
//绑定成功,, 同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务器端监听结束
f.channel().closeFuture().sync();
}finally {
//优雅退出释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class Decoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
}
}
class ServerHandler extends ChannelHandlerAdapter{
//整行int占四个字节
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*ByteBuf readFromClient = (ByteBuf)msg;
byte[] readbytes = new byte[readFromClient.readableBytes()];
readFromClient.readBytes(readbytes);
String readStr = new String(readbytes, "UTF-8");
System.out.println("client send to server " + readStr);*/
String readStr = (String)msg;
System.out.println("client send to server " + readStr);
String currentTime = System.currentTimeMillis()+" ";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
就是在事件处理ServerHandler之前先加入一个Decoder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MsgDecoder());
socketChannel.pipeline().addLast(new ServerHandler());
}
});
最终结果:Server端:
image.png
客户端:
image.png
可以看到客户端发送1000个hello也就是5000个字节,Server一次性收到, 而不是被拆成好几个, 同事只返回客户端一次response,所以客户端只收到一个时间。
网友评论