2.未处理示例
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger
.getLogger(TimeClientHandler.class.getName());
private int counter;
private byte[] req;
/**
* Creates a client-side handler.
*/
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
.getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf message = null;
for (int i = 0; i < 10; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body + " ; the counter is : "
+ ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
logger.warning("Unexpected exception from downstream : "
+ cause.getMessage());
ctx.close();
}
}
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8").substring(0, req.length
- System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + body
+ " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
输出结果,发生了粘包,10个数据包在一起接收到
The time server receive order : QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER ; the counter is : 1
3.使用LineBasedFrameDecoder解决粘包问题
基于行来进行消息粘包拆包处理,使用了行尾控制字符(\n 或者\r\n)来解析消息数据
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
输出结果:
The time server receive order : QUERY TIME ORDER ; the counter is : 1
The time server receive order : QUERY TIME ORDER ; the counter is : 2
The time server receive order : QUERY TIME ORDER ; the counter is : 3
The time server receive order : QUERY TIME ORDER ; the counter is : 4
The time server receive order : QUERY TIME ORDER ; the counter is : 5
The time server receive order : QUERY TIME ORDER ; the counter is : 6
The time server receive order : QUERY TIME ORDER ; the counter is : 7
The time server receive order : QUERY TIME ORDER ; the counter is : 8
The time server receive order : QUERY TIME ORDER ; the counter is : 9
The time server receive order : QUERY TIME ORDER ; the counter is : 10
4.DelimiterBasedFrameDecoder
以分隔符作为码流结束标识的消息的解码
下面示例:
DelimiterBasedFrameDecoder的分隔符为$_
static final String ECHO_REQ = "Hi, Lilinfeng. Welcome to Netty.$_";
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_"
.getBytes());
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
输出消息结果:
This is 1 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 2 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 3 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 4 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 5 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 6 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 7 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 8 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 9 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
This is 10 times receive client : [Hi, Lilinfeng. Welcome to Netty.]
5.FixedLengthFrameDecoder
固定长度解码
new FixedLengthFrameDecoder(20)表示固定长度20解码
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
输出消息结果:为固定长度20
Receive client : [Hi, Lilinfeng. Welco]
Receive client : [me to Netty.$_Hi, Li]
Receive client : [linfeng. Welcome to ]
Receive client : [Netty.$_Hi, Lilinfen]
Receive client : [g. Welcome to Netty.]
Receive client : [$_Hi, Lilinfeng. Wel]
Receive client : [come to Netty.$_Hi, ]
Receive client : [Lilinfeng. Welcome t]
Receive client : [o Netty.$_Hi, Lilinf]
6.LengthFieldBasedFrameDecoder
自定义包协议,如果需要自定义协议,可以使用此解码器
https://www.jishux.com/p/00ad4d5d4d4ed50d
参考:
https://my.oschina.net/andylucc/blog/625315
https://blog.csdn.net/scythe666/article/details/51996268
网友评论