在上一篇文章中介绍了 LengthFieldPrepender + StringEncoder 长度前置的文本编码器 和 LengthFieldBasedFrameDecoder + StringDecoder 基本长度解析的文本解码器组合的使用。
LengthFieldBasedFrameDecoder 解码器的功能还是比较强大的,可以解析 字头 + 数据长度 + 消息体 的协议格式,不止 6 种,还可以有很多变种。
在编码的时候,LengthFieldPrepender 长度前置编码类,就有些局限,只能添加数据长度,再添加一个字头就实现不了,今天就在 LengthFieldPrepender 的基础上改造一下,实现增加字头编码的功能。
做过物联网的朋友都知道,硬件那边使用的都是字节数组,很少有文本类型的信息,所以文本编码器 StringEncoder 会换成 ByteArrayEncoder 字节数组编码器,文本解码器 StringDecoder 会换成 ByteArrayDecoder 字节数组解码器。
在处理业务逻辑的时候,我们处理的是一组字节数组,在给客户端发送指令的时候,也发送一组字节数组,客户端发给服务器的时候,服务器接收到的也是一组字节数组。
在项目的历史版本中,服务器端使用拼接的方式,拼接字头,拼接数据长度,然后发给客户端。在这个版本中,我要把字头做成公共的,数据长度也做成公共的,在业务处理器中接收完整的数据包,去掉字头,根据数据长度获取消息体,毕竟消息体是不固定的,至于解析处理的字节代表什么意思,在此可以忽略不计。
还是老规矩,客户端给服务器端发送100条数据,服务器端反馈给客户端100条数据,查看运行结果是否会发生粘包/拆包问题。
服务器端代码:
package com.test.nio.stickyBag;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Netty服务器端
* @author 程就人生
* @date
* @Description
*
*/
public class TestServer {
public void bind(final int port){
// 配置服务端的Nio线程组,boosGroup负责新客户端接入
EventLoopGroup boosGroup = new NioEventLoopGroup();
// workerGroup负责I/O消息处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
// 线程组设置为非阻塞
.channel(NioServerSocketChannel.class)
//连接缓冲池的大小
.option(ChannelOption.SO_BACKLOG, 1024)
//设置通道Channel的分配器
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//设置长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 包长最大长度不能超过65535
* 长度偏移量 1 个字节
* 长度占用2个字节
* 长度调整-3
* 剥离初始字节0
*/
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));
// Bytebuf 转 byte数组
ch.pipeline().addLast(new ByteArrayDecoder());
// 对要发出去的数据进行编码
/**
* 根据 LengthFieldPrepender解码类改写
* 长度占用2个字节
* 长度调整0
* 长度字段中包含长度本身占用的字节数,
* 字头为0XFE
* 字头占用1个字节
*/
ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
// byte数组 转 Bytebuf
ch.pipeline().addLast(new ByteArrayEncoder());
// 事件处理绑定,具体的业务逻辑处理
ch.pipeline().addLast(new ServerHandler());
}
});
// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 服务端启动监听事件
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
//启动成功后的处理
if (future.isSuccess()) {
System.out.println("服务器启动成功,Started Successed:" + port);
} else {
System.out.println("服务器启动失败,Started Failed:" + port);
}
}
});
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
// 优雅退出
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestServer().bind(8080);
}
}
/**
* 服务器端handler
* @author 程就人生
* @date
* @Description
*
*/
class ServerHandler extends ChannelInboundHandlerAdapter{
// 对接收的消息进行计数
private static int counter;
// I/O消息的接收处理
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
// 把接收到的内容输出到控制台
byte[] data = (byte[]) msg;
int dataLength = data.length;
ByteBuf buf = Unpooled.buffer(dataLength);
buf.writeBytes(data);
System.out.println("这里是服务器端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + "计数:" + ++counter);
buf = Unpooled.buffer(15);
buf.writeByte(0X08);
buf.writeByte(0XD4);
buf.writeByte(0X9B);
buf.writeByte(0X06);
buf.writeByte(0XB6);
buf.writeByte(0X01);
buf.writeByte(0X11);
buf.writeByte(0X01);
buf.writeByte(0X01);
buf.writeByte(0X00);
buf.writeByte(0X12);
buf.writeByte(0X02);
buf.writeByte(0XF4);
buf.writeByte(0X01);
buf.writeByte(0X60);
// 返回信息给客户端
ctx.writeAndFlush(buf.array());
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 遇到异常时关闭ChannelHandlerContext
ctx.close();
}
}
在 59 行,我们对进来的数据先使用 LengthFieldBasedFrameDecoder 类进行解码,数据包长最大不能超过 65535,长度偏移量 1 个字节(字头的长度),长度占用2个字节,长度调整 -3 (字头占用的字节 + 数据长度占用的字节),剥离初始字节 0 个字节 (解析后还需要完整的数据包)。
经过 LengthFieldBasedFrameDecoder 解码后,在 61 行再经过 ByteArrayDecoder 字节数组解码器解码成字节数组,最后传递给 ServerHandler 进行具体的业务逻辑处理。
在业务处理类 ServerHandler 中的 channelRead 方法中,我们将接收到的字节数组在控制台打印,并给客户端发送消息体长度为 15 个字节的字节数组,加上消息头和数据长度,客户端收到的应该是 18 个字节的字节数组。在 128 行,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印客户端发过来的数据到控制台。
在 72 行,使用 LengthHeaderFieldPrepender 编码类对将要发出的字节数组进行加工,加上字头和数据长度,最后交给 ByteArrayEncoder 编码类将数据写入到 Channel 通道中,发送给客户端。
根据 LengthFieldPrepender 改写的 LengthHeaderFieldPrepender 代码:
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import java.nio.ByteOrder;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.internal.ObjectUtil;
/**
* 根据 LengthFieldPrepender 类改写
* @author 程就人生
* @Date
*
* 解码前: 解码后:
* +--------+--------+----------------+ +--------+--------+----------------+
* | Header| Length | Actual Content | ----->| Header | Length | Actual Content |
* | 0xFE | 0x0006 | OX00 0X01 0X11 | | 0xFE | 0x0006 | OX00 0X01 0X11 |
* +------ -+--------+----------------+ +--------+--------+----------------+
*/
@Sharable
public class LengthHeaderFieldPrepender extends MessageToMessageEncoder<ByteBuf> {
private final ByteOrder byteOrder;
private final int lengthFieldLength;
private final boolean lengthIncludesLengthFieldLength;
private final int lengthAdjustment;
private final int header;
private final int headerLength;
/**
* Creates a new instance.
*
* @param lengthFieldLength the length of the prepended length field.
* Only 1, 2, 3, 4, and 8 are allowed.
*
* @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/
public LengthHeaderFieldPrepender(int lengthFieldLength) {
this(lengthFieldLength, false);
}
/**
* Creates a new instance.
*
* @param lengthFieldLength the length of the prepended length field.
* Only 1, 2, 3, 4, and 8 are allowed.
* @param lengthIncludesLengthFieldLength
* if {@code true}, the length of the prepended
* length field is added to the value of the
* prepended length field.
*
* @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/
public LengthHeaderFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);
}
/**
* Creates a new instance.
*
* @param lengthFieldLength the length of the prepended length field.
* Only 1, 2, 3, 4, and 8 are allowed.
* @param lengthAdjustment the compensation value to add to the value
* of the length field
*
* @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/
public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment) {
this(lengthFieldLength, lengthAdjustment, false);
}
/**
* Creates a new instance.
*
* @param lengthFieldLength the length of the prepended length field.
* Only 1, 2, 3, 4, and 8 are allowed.
* @param lengthAdjustment the compensation value to add to the value
* of the length field
* @param lengthIncludesLengthFieldLength
* if {@code true}, the length of the prepended
* length field is added to the value of the
* prepended length field.
*
* @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/
public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) {
this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, 0, 0);
}
public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, header, headerLength);
}
/**
* Creates a new instance.
*
* @param byteOrder the {@link ByteOrder} of the length field
* @param lengthFieldLength the length of the prepended length field.
* Only 1, 2, 3, 4, and 8 are allowed.
* @param lengthAdjustment the compensation value to add to the value
* of the length field
* @param lengthIncludesLengthFieldLength
* if {@code true}, the length of the prepended
* length field is added to the value of the
* prepended length field.
*
* @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/
public LengthHeaderFieldPrepender(
ByteOrder byteOrder, int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
lengthFieldLength != 3 && lengthFieldLength != 4 &&
lengthFieldLength != 8) {
throw new IllegalArgumentException(
"lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
lengthFieldLength);
}
ObjectUtil.checkNotNull(byteOrder, "byteOrder");
this.byteOrder = byteOrder;
this.lengthFieldLength = lengthFieldLength;
this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
this.header = header;
this.headerLength = headerLength;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int length = msg.readableBytes() + lengthAdjustment + headerLength;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
// 写字头(新加)
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) header));
checkPositiveOrZero(length, "length");
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
case 2:
if (length >= 65536) {
throw new IllegalArgumentException(
"length does not fit into a short integer: " + length);
}
out.add(ctx.alloc().buffer(2).order(byteOrder).writeShort((short) length));
break;
case 3:
if (length >= 16777216) {
throw new IllegalArgumentException(
"length does not fit into a medium integer: " + length);
}
out.add(ctx.alloc().buffer(3).order(byteOrder).writeMedium(length));
break;
case 4:
out.add(ctx.alloc().buffer(4).order(byteOrder).writeInt(length));
break;
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
out.add(msg.retain());
}
}
在该编码类中,增加了两个成员变量,一个是字头 header,另一个是字头长度 headerLength,分别用在了 137 行和 142 行,计算消息体长度的时候用到了字头长度 headerLength,写入字头的时候用到了字头 header。其实在写入字头的时候也应该像 146 行一样,根据字头长度,写入不同数据类型的字头,这里先免了吧,严谨一点还是要加的。
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
/**
* netty客户端
* @author 程就人生
* @date
* @Description
*
*/
public class TestClient {
public void connect(int port, String host){
// 客户端Nio线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
// 线程组设置为非阻塞
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 包长最大长度不能超过65535
* 长度偏移量 1 个字节
* 长度占用2个字节
* 长度调整-3
* 剥离初始字节0
*/
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));
// Bytebuf 转 byte数组
ch.pipeline().addLast(new ByteArrayDecoder());
// 对要发出去的数据进行编码
/**
* 根据 LengthFieldPrepender解码类改写
* 长度占用2个字节
* 长度调整0
* 长度字段中包含长度本身占用的字节数,
* 字头为0XFE
* 字头占用1个字节
*/
ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
// byte数组 转 Bytebuf
ch.pipeline().addLast(new ByteArrayEncoder());
// 事件处理绑定,具体的业务逻辑处理
ch.pipeline().addLast(new ClientHandler());
}
});
// 建立连接
ChannelFuture channelFuture = bootstrap.connect(host, port);
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
// 优雅退出
group.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestClient().connect(8080, "localhost");
}
}
/**
* 客户端处理handler
* @author 程就人生
* @date
* @Description
*
*/
class ClientHandler extends ChannelInboundHandlerAdapter{
// 对接收的消息次数进行计数
private static int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
ByteBuf firstMessage = null;
for(int i = 0;i<100;i++){
firstMessage = Unpooled.buffer(7);
firstMessage.writeByte(0X08);
firstMessage.writeByte(0X87);
firstMessage.writeByte(0X9A);
firstMessage.writeByte(0X01);
firstMessage.writeByte(0X5D);
firstMessage.writeByte(0X01);
firstMessage.writeByte(0X90);
ctx.writeAndFlush(firstMessage.array());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
byte[] data = (byte[]) msg;
int dataLength = data.length;
ByteBuf buf = Unpooled.buffer(dataLength);
buf.writeBytes(data);
System.out.println("这里是客户端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + ";计数: " + ++counter);
// 释放资源
ReferenceCountUtil.release(buf);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
客户端在使用匿名内部类设置编解码规则时,需要和服务端保持一致。
在 97 行的 channelActive 方法中,客户端向服务器发送 100 次数据,每条数据不包含字头数据长度占用7个字节,加上字头数据长度,要占用10个字节。服务器端输出的也应该是10个字节的数据。
在客户端业务逻辑处理器 ClientHandler 中的 channelRead 方法中,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印服务器端发过来的数据到控制台,并使用 ReferenceCountUtil.release(buf) 方法对资源进行释放。
服务器端运行结果:
客户端运行结果:
以上便是 LengthFieldBasedFrameDecoder + ByteArrayDecoder 解码类和 LengthHeaderFieldPrepender + ByteArrayEncoder 编码类组合使用的示例。
网友评论