在复杂的网络世界中,各种应用之间通信需要依赖各种各样的协议,比如:HTTP,Telnet,FTP,SMTP等等。
在开发过程中,有时候我们需要构建一些适应自己业务的应用层协议,比如银行业中通用的8583报文,Netty作为目前Java-NIO方向最优秀的框架,可以帮助我们快速构建自定议协议,本文将以一个简洁的例子帮助大家来了解一下。
协议约定
协议名称: FF
image.png协议规则:
如图所示,分为Header和Content两部分,Content的长度为变长,由header中的content-length来定义。
定义消息对象
FFHeader.java
package com.jack.study.netty01.customJianShu.mesage;
//消息的头部
public class FFHeader {
// 协议版本
private int version;
// 消息内容长度
private int contentLength;
// 服务名称
private String sessionId;
public FFHeader(int version, int contentLength, String sessionId) {
this.version = version;
this.sessionId = sessionId;
this.contentLength = contentLength;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}
FFMessage.java
package com.jack.study.netty01.customJianShu.mesage;
//消息的主体
public class FFMessage {
private FFHeader luckHeader;
private String content;
public FFMessage(FFHeader luckHeader, String content) {
this.luckHeader = luckHeader;
this.content = content;
}
public FFHeader getLuckHeader() {
return luckHeader;
}
public void setLuckHeader(FFHeader luckHeader) {
this.luckHeader = luckHeader;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]", luckHeader.getVersion(),
luckHeader.getContentLength(), luckHeader.getSessionId(), content);
}
}
定义编码器
package com.jack.study.netty01.customJianShu.codec;
import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 编码器
*
*/
public class FFEncoder extends MessageToByteEncoder<FFMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, FFMessage message, ByteBuf out) throws Exception {
// 将Message转换成二进制数据
FFHeader header = message.getLuckHeader();
// 写入Header信息
out.writeInt(header.getVersion());
out.writeInt(header.getContentLength());
out.writeBytes(header.getSessionId().getBytes());
// 写入消息主体信息
out.writeBytes(message.getContent().getBytes());
}
}
这里没有什么好说的,就是按定义好的顺序输出即可。
定义解码器
package com.jack.study.netty01.customJianShu.codec;
import java.util.List;
import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* 解码器
*
*/
public class FFDecoder extends ByteToMessageDecoder {
private final static int HEADER_LENGTH = 44;// header的长度
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 长度不足,退出
if (in.readableBytes() < HEADER_LENGTH) {
return;
}
// 获取协议的版本
int version = in.readInt();
// 获取消息长度
int contentLength = in.readInt();
// 获取SessionId
byte[] sessionByte = new byte[36];
in.readBytes(sessionByte);
String sessionId = new String(sessionByte);
// 组装协议头
FFHeader header = new FFHeader(version, contentLength, sessionId);
// 长度不足重置读index,退出
if (in.readableBytes() < contentLength) {
in.setIndex(in.readerIndex() - HEADER_LENGTH, in.writerIndex());
return;
}
byte[] content = new byte[contentLength];
// 读取消息内容
in.readBytes(content);
FFMessage message = new FFMessage(header, new String(content));
out.add(message);
}
}
这个类是核心的处理了,其中两个IF代码段的处理是为了解决拆包粘包的问题。如果没有这两段消息的解析在多条消息时就会产生错乱。
Server监听消息
package com.jack.study.netty01.customJianShu.server;
import com.jack.study.netty01.customJianShu.codec.FFDecoder;
import com.jack.study.netty01.customJianShu.codec.FFEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
// 指定端口号
private static final int PORT = 8888;
public static void main(String args[]) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些属性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一个NIO连接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
// 这里必须给每个Handler都添加一个独立的Decoder.
pipeline.addLast(new FFEncoder());
pipeline.addLast(new FFDecoder());
// 添加逻辑控制层
pipeline.addLast(new ServerHandler());
}
});
// 绑定对应的端口号,并启动开始监听端口上的连接
Channel ch = serverBootstrap.bind(PORT).sync().channel();
System.out.printf("luck协议启动地址:127.0.0.1:%d/\n", PORT);
// 等待关闭,同步端口
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.jack.study.netty01.customJianShu.server;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler<FFMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FFMessage msg) throws Exception {
// 简单地打印出server接收到的消息
System.out.println("接收:"+msg);
}
}
Client端发送消息
package com.jack.study.netty01.customJianShu.client;
import java.util.UUID;
import com.jack.study.netty01.customJianShu.codec.FFDecoder;
import com.jack.study.netty01.customJianShu.codec.FFEncoder;
import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编码器
pipeline.addLast(new FFEncoder());
// 添加解码器
pipeline.addLast(new FFDecoder());
// 业务处理类(只打印了消息内容)
pipeline.addLast(new ClientHandler());
}
});
// 连接服务端
Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
int version = 1;
String sessionId = UUID.randomUUID().toString();
String str = "Hello!";
// 发送1000000条消息
for (int i = 0; i < 100000; i++) {
String content = str + "----" + i;
FFHeader header = new FFHeader(version, content.length(), sessionId);
FFMessage message = new FFMessage(header, content);
ch.writeAndFlush(message);
}
ch.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
package com.jack.study.netty01.customJianShu.client;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<FFMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FFMessage message) throws Exception {
System.out.println(message);
}
}
这里为什么发送100000条消息,而不是1条,主要是为了测试消息处理的确性,只发送1条是无法暴露拆包粘包问题的。
测试
- 启动Server.java
- 启动Client.java
运行截图:
网友评论