美文网首页
Netty原理与基础(二)

Netty原理与基础(二)

作者: smallmartial | 来源:发表于2020-10-12 21:13 被阅读0次

    5.Handler业务处理器

    在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端


    image.png

    用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。

    从应用程序开发人员的角度来看,有入站和出站两种类型操作。
    · 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
    · 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。

    • ChannelInboundHandler通道入站处理器


      image.png
    • ChannelOutboundHandler通道出站处理器


      image.png

    5.1ChannelInitializer通道初始化处理器

    通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器
    如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。

    • initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

    5.2ChannelInboundHandler的生命周期

    ChannelInboundHandler的生命周期分2类:

    • 生命周期方法
      (1)handlerAdded() :当业务处理器被加入到流水线后,此方法被回调。也就是在完成ch.pipeline().addLast(handler)语句之后,会回调handlerAdded()。
      (2)channelRegistered():当通道成功绑定一个NioEventLoop线程后,会通过流水线回调所有业务处理器的channelRegistered()方法。(3)channelActive():当通道激活成功后,会通过流水线回调所有业务处理器的channelActive()方法。通道激活成功指的是,所有的业务处理器添加、注册的异步任务完成,并且NioEventLoop线程绑定的异步任务完成。(4)channelInactive():当通道的底层连接已经不是ESTABLISH状态,或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
      (5)channelUnregistered():通道和NioEventLoop线程解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
      (6)handlerRemoved():最后,Netty会移除掉通道上所有的业务处理器,并且回调所有的业务处理器的handlerRemoved()方法。
      -入栈回调方法
      (1)channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
      (2)channelReadComplete():流水线完成入站处理后,会从前向后,依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。

    5.3代码示例

    public class InHandlerDemo extends ChannelInboundHandlerAdapter {
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:handlerAdded()");
            super.handlerAdded(ctx);
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:channelRegistered()");
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:channelActive()");
            super.channelActive(ctx);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Logger.info("被调用:channelRead()");
            super.channelRead(ctx, msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:channelReadComplete()");
            super.channelReadComplete(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:channelInactive()");
            super.channelInactive(ctx);
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用: channelUnregistered()");
            super.channelUnregistered(ctx);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Logger.info("被调用:handlerRemoved()");
            super.handlerRemoved(ctx);
        }
    }
    

    测试类

    public class InHandlerDemoTester {
        @Test
        public void testInHandlerLifeCircle() {
            final InHandlerDemo inHandler = new InHandlerDemo();
            //初始化处理器
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                @Override
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(inHandler);
                }
            };
            //创建嵌入式通道
            EmbeddedChannel channel = new EmbeddedChannel(i);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(1);
            //模拟入站,写一个入站包
            channel.writeInbound(buf);
            channel.flush();
            //模拟入站,再写一个入站包
            channel.writeInbound(buf);
            channel.flush();
            //通道关闭
            channel.close();
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Netty原理与基础(二)

          本文链接:https://www.haomeiwen.com/subject/iwbppktx.html