5.Handler业务处理器
在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端
image.png
用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。
从应用程序开发人员的角度来看,有入站和出站两种类型操作。
· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
-
ChannelInboundHandler通道入站处理器
image.png -
ChannelOutboundHandler通道出站处理器
image.png
5.1ChannelInitializer通道初始化处理器
通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器
如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。
- initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
5.2ChannelInboundHandler的生命周期
ChannelInboundHandler的生命周期分2类:
- 生命周期方法
(1)handlerAdded() :当业务处理器被加入到流水线后,此方法被回调。也就是在完成ch.pipeline().addLast(handler)语句之后,会回调handlerAdded()。
(2)channelRegistered():当通道成功绑定一个NioEventLoop线程后,会通过流水线回调所有业务处理器的channelRegistered()方法。(3)channelActive():当通道激活成功后,会通过流水线回调所有业务处理器的channelActive()方法。通道激活成功指的是,所有的业务处理器添加、注册的异步任务完成,并且NioEventLoop线程绑定的异步任务完成。(4)channelInactive():当通道的底层连接已经不是ESTABLISH状态,或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
(5)channelUnregistered():通道和NioEventLoop线程解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
(6)handlerRemoved():最后,Netty会移除掉通道上所有的业务处理器,并且回调所有的业务处理器的handlerRemoved()方法。
-入栈回调方法
(1)channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
(2)channelReadComplete():流水线完成入站处理后,会从前向后,依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。
5.3代码示例
public class InHandlerDemo extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:handlerAdded()");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:channelRegistered()");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:channelActive()");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Logger.info("被调用:channelRead()");
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:channelReadComplete()");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:channelInactive()");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用: channelUnregistered()");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Logger.info("被调用:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
测试类
public class InHandlerDemoTester {
@Test
public void testInHandlerLifeCircle() {
final InHandlerDemo inHandler = new InHandlerDemo();
//初始化处理器
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(inHandler);
}
};
//创建嵌入式通道
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
//模拟入站,写一个入站包
channel.writeInbound(buf);
channel.flush();
//模拟入站,再写一个入站包
channel.writeInbound(buf);
channel.flush();
//通道关闭
channel.close();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
网友评论