在之前的文章中,介绍了DelimiterBasedFrameDecoder
(基于特殊符号的编码器)和FixedLengthFrameDecoder
定长编码器)的用法.这篇文章主要是介绍使用LengthFieldBasedFrameDecoder解码器自定义协议.通常,协议的格式如下:
通常来说,使用
ByteToMessageDocoder
这个编码器,我们要分别解析出Header,length,body这几个字段.而使用LengthFieldBasedFrameDecoder
,我们就可以直接接收想要的一部分,相当于在原来的基础上包上了一层,有了这层之后,我们可以控制我们每次只要读想读的字段,这对于自定义协议来说十分方便.
1. LengthFieldBasedFrameDecoder
的定义
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
private static final int HEADER_SIZE = 6;
/**
*
* @param maxFrameLength 帧的最大长度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字节长
* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
* @param initialBytesToStrip 解析时候跳过多少个长度
* @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
in = (ByteBuf) super.decode(ctx,in);
if(in == null){
return null;
}
if(in.readableBytes()<HEADER_SIZE){
throw new Exception("字节数不足");
}
//读取type字段
byte type = in.readByte();
//读取flag字段
byte flag = in.readByte();
//读取length字段
int length = in.readInt();
if(in.readableBytes()!=length){
throw new Exception("标记的长度不符合实际长度");
}
//读取body
byte []bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
}
}
- 在上述的代码中,调用父类的方法,实现截取到自己想要的字段.
2. 协议实体的定义
public class MyProtocolBean {
//类型 系统编号 0xA 表示A系统,0xB 表示B系统
private byte type;
//信息标志 0xA 表示心跳包 0xC 表示超时包 0xC 业务信息包
private byte flag;
//内容长度
private int length;
//内容
private String content;
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getFlag() {
return flag;
}
public void setFlag(byte flag) {
this.flag = flag;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public MyProtocolBean(byte flag, byte type, int length, String content) {
this.flag = flag;
this.type = type;
this.length = length;
this.content = content;
}
@Override
public String toString() {
return "MyProtocolBean{" +
"type=" + type +
", flag=" + flag +
", length=" + length +
", content='" + content + '\'' +
'}';
}
}
3.服务端的实现
public class Server {
private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大长度
private static final int LENGTH_FIELD_LENGTH = 4; //长度字段所占的字节数
private static final int LENGTH_FIELD_OFFSET = 2; //长度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private int port;
public Server(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
- 关于
LengthFieldBasedFrameDecoder
的各个字段的头特别意义,可以参考这篇文章(http://www.voidcn.com/article/p-gzqzzsyz-bqy.html),在这里,我们只需要掌握
`
maxFrameLength: 帧的最大长度
lengthFieldOffset length: 字段偏移的地址
lengthFieldLength length;字段所占的字节长
lengthAdjustment: 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
initialBytesToStrip: 解析时候跳过多少个长度
failFast; 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
- 只需要把
MyProtocolDecoder
加入pipeline中就可以.但是得在Handler之前.
- 服务端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接转化成协议消息实体
System.out.println(myProtocolBean.getContent());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
* 服务端Handler没什么特别的地方,只是输出接收到的消息
5. 客户端和客户端Handler
public class Client {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
ctx.writeAndFlush(myProtocolBean);
}
}
- 客户端Handler实现发送消息.
客户端编码器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("msg is null");
}
out.writeByte(msg.getType());
out.writeByte(msg.getFlag());
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
}
}
- 编码的时候,只需要按照定义的顺序依次写入到ByteBuf中.
6. 总结
通过以上的消息,我们能够在客户端和服务端实现自定义协议的交互.其实,在以上的过程中,如果我们把
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
改成
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,6,false));
那么我们就可以直接跳过Header和length字段,从而解码的时候,就只需要对body字段解码就行.其他字段跳过就忽略了.这也就是调用父类的方法的原因(使最后拿到的ByteBuf只有body部分,而没有其他部分).
参考文章
一起学Netty(九)之LengthFieldBasedFrameDecoder
Netty的LengthFieldBasedFrameDecoder使用
网友评论