- 案例代码下载
- 初学者的话推荐直接套用 all-in-one 的 jar 包,若熟悉 Netty 可以根据需求添加不同的 jar 包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.22.Final</version>
</dependency>
TIME 协议(服务器)
目标:编写一个 TIME 协议,服务器端在接收到客户端的连接时会向客户端发送一个 32 位的时间戳,并且一旦消息发送成功就会立即关闭
编写 Handler
- 首先编写 Handler(处理器),继承 ChannelInboundHandlerAdapter 类(该类默认将事件自动传播到下一个入站处理器)并重写其两个事件方法:
- channelActive():Channel激活,当有客户端连接时触发
- exceptionCaught():捕获到异常时触发
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
代码分析:
- 我们需要写入一个 32位 的时间戳,因此需要一个至少有 4 个字节的 ByteBuf,通过 ChannelHandlerContext.alloc() 得到一个当前的 ByteBufAllocator,然后分配一个新的缓冲
- 通过 ByteBuf 的 write() 方法写入时间戳
- 通过 ChannelHandlerContext 对象的 writeAndFlush() 方法将字节容器的数据写入缓冲区并刷新,它会返回一个 ChannelFuture 对象
- 因为 Netty 的操作都是异步的,例如下面代码中的消息在被发送之前可能会被先关闭连接
Channel ch = ...; ch.writeAndFlush(message); ch.close();
- 因此 close() 方法需要在数据通过 write() 发送到客户端之后在调用,因此为 ChannelFuture 增加一个 ChannelFutureListener 来监听操作完成事件,并关闭 Channel
- 也可以使用简单的预定义监听代码
f.addListener(ChannelFutureListener.CLOSE);
- ctx.write(Object) 方法不会使消息写入到通道上,它被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者你可以用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目的
编写服务器类
- 服务端需要两个 NioEventLoopGroup,它本质是一个线程池:
- bossGroup:处理客户端连接事件的线程池
- workerGroup:处理连接后所有事件的线程池
- ServerBootstrap 是服务端的辅助启动类,用于创建服务端
- 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
- 通过 ChannelInitializer 辅助配置客户端连接生成的 Channel,指定需要执行的 Handler
- 设置 EventLoopGroup 参数:
- .option:用于设置 bossGroup 的相关参数
- .childOption:用于设置workerGroup相关参数
- 绑定端口,并调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture,调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理客户端连接事件的线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理连接后所有事件的线程池
try {
ServerBootstrap bootstrap = new ServerBootstrap(); // NIO 服务的辅助启动类
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler()); // 指定需要执行的 Handler
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置 bossGroup 的相关参数
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置 workerGroup 相关参数
ChannelFuture f = bootstrap.bind(port).sync(); // 绑定端口,调用 ChannelFuture 的 sync() 阻塞方法等待绑定完成
// 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
// 调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出机制。。。退出线程池(该方法源码没读过,也不知怎么个优雅方式)
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = (args.length > 0) ? Integer.parseInt(args[0]) : 8080;
new Server(port).run();
}
}
TIME 协议(客户端)
目标:连接服务端并接收服务端发送的时间戳消息,输出到控制台
编写 Handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
编写客户端类
- 与服务端唯一不同的是使用 BootStrap 和 Channel 的实现,并调用 connect() 方法连接服务端
public class Client {
public static void main(String[] args) throws Exception {
String host = (args.length == 1) ? args[0] : "localhost";
int port = (args.length == 2) ? Integer.parseInt(args[1]) : 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
ChannelFuture f = bootstrap.connect(host, port).sync(); // 连接服务端,调用 ChannelFuture 的 sync() 阻塞方法等待连接完成
// 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
// 调用 ChannelFuture 的 sync() 阻塞方法直到客户端关闭链路之后才退出 main() 函数
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
处理基于流的传输
回到 TIME 客户端例子,服务端发送的数据是一个 32位 的时间戳,如果服务端发送了 16位 的数据呢,那客户端读取的数据就不准确了
解决方法一
- 构造一个内部的缓冲,只有直到 4 个字节全部接收到内部缓冲,才进行处理
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf byteBuf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
byteBuf = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
byteBuf.release();
byteBuf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
byteBuf.writeBytes(m);
m.release();
if (byteBuf.readableBytes() >= 4){
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
代码分析:
- ChannelHandler 有 2 个生命周期监听方法:handlerAdded()、handlerRemoved() ,你可以完成任意初始化任务,只要它不会阻塞很长时间
- 分配一个 4 个字节的字节容器,将读取的数据写入该字节容器
- 判断容器中是否有足够的数据(4个字节),如果有在进行业务处理
解决方法二
- 方法一虽然解决了问题但修改后的处理器并不简洁,可以把一整个 ChannelHandler 拆分成多个 ChannelHandler 以减少应用复杂度,多个 ChannelHandler 构成一个处理链
- 因此可以将 TimeClientHandler 拆分成2个处理器:
- TimeDecoder:解析数据
- TimeClientHandler:处理业务,跟初始实现一样
- Netty 提供了 ByteToMessageDecoder 可以帮助完成 TimeDecoder 的开发,ByteToMessageDecoder 是 ChannelInboundHandler 的一个实现类,它可以让数据解析变得更简单
public class TimeDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < 4) {
return;
}
list.add(byteBuf.readBytes(4));
}
}
-
代码分析:
- 每当有新数据接收时,ByteToMessageDecoder 都会调用 decode() 方法来处理字节容器 ByteBuf 对象
- 如果在 decode() 方法里增加一个对象到 list 对象里面,则意味着解码消息成功,ByteToMessageDecoder 将会丢弃在字节容器 ByteBuf 里已经被读过的数据
- 修改 ChannelInitializer 将另外一个 ChannelHandler 加入到 ChannelPipeline
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
}
});
用 POJO 代替 ByteBuf
之前例子使用 ByteBuf 作为协议消息的数据结构,目前读取的仅仅是一个 32 位 的数据,直接使用 ByteBuf 不是问题,然而在真实的协议中,数据量肯定不止如此,通过 ByteBuf 处理数据将变的复杂困难,因此下面介绍如何使用 POJO(普通 Java 对象) 代替 ByteBuf
- 首先定义新类型 UnixTime
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
- 修改 TimeDecoder 类,返回一个 UnixTime 以代替 ByteBuf
protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < 4) {
return;
}
list.add(new UnixTime(byteBuf.readInt()));
}
- 修改 TimeClientHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime unixTime = (UnixTime) msg;
ctx.close();
}
- 修改 TimeServerHandler
@Override
public void channelActive(final ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
- 最后还需要实现一个编码器,通过实现 ChannelOutboundHandler 来将 UnixTime 对象转换为 ByteBuf,这里有两个点要注意:
- 当编码后的数据被写到了通道上 Netty 可以通过 ChannelPromise 对象的标记确认成功或失败
- 不需要调用 cxt.flush(),因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),如果想自己实现 flush() 方法内容可以自行覆盖这个方法
public class TimeEncoder extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise);
}
}
- 你可以使用 MessageToByteEncode
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
- 最后将 TimeEncoder 加入到 ChannelPipeline,并位于 TimeServerHandler 之前
关闭应用
- 关闭一个 Netty 应用只需通过 shutdownGracefully() 方法来关闭所有的 EventLoopGroup
- 当所有的 EventLoopGroup 被完全地终止,并且对应的所有 channel 都已经被关闭时,Netty 会返回一个 Future 对象来通知你
网友评论