Netty 超时机制介绍
- Netty 为超时机制提供了几个关键类:
-
IdleState 是一个枚举,它包含了超时的状态:
-
ALL_IDLE:一段时间内没有数据接收或者发送
-
READER_IDLE:一段时间内没有数据接收
-
WRITER_IDLE:一段时间内没有数据发送
- IdleStateEvent 是超时状态下由 IdleStateHandler 触发的事件
- IdleStateHandler 超时状态处理器,定义什么情况下触发读写超时状态
- ReadTimeoutHandler 读超时状态处理,定义什么情况下触发读超时状态
- WriteTimeoutHandler 写超时状态定义,定义什么情况下触发写超时状态
编写超时状态处理
- 添加 IdleStateHandler 与 HeartbeatServerHandler 至 ChannelHandler 链
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private static final int READ_IDEL_TIME_OUT = 4; // 读超时
private static final int WRITE_IDEL_TIME_OUT = 5; // 写超时
private static final int ALL_IDEL_TIME_OUT = 7; // 读写超时,0 为禁用
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS))
.addLast(new HeartbeatServerHandler());
}
}
- 编写心跳处理器,判断是否是 IdleStateEvent 事件,是则处理
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String type = "";
if (event.state() == IdleState.READER_IDLE)
type = "read idle";
else if (event.state() == IdleState.WRITER_IDLE)
type = "write idle";
else if (event.state() == IdleState.ALL_IDLE)
type = "all idle";
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
System.out.println(ctx.channel().remoteAddress() + "超时类型:" + type);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("WebsocketChatServer 启动了");
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = (args.length > 0) ? Integer.parseInt(args[0]) : 8080;
new Server(port).run();
}
}
网友评论