美文网首页
Netty高级功能(二):业务处理中线程池和内存池的使用

Netty高级功能(二):业务处理中线程池和内存池的使用

作者: 雪飘千里 | 来源:发表于2019-10-03 22:18 被阅读0次

    1、线程池

    在业务channelHandler中,我们有可能会有一些导致同步阻塞的业务处理逻辑,比如数据库操作,同步的调用第三方服务等,这时候,为了提升性能,我们可以采用线程池来提升并发处理能力。

    线程池添加策略:

    1、业务自定义线程池执行业务channleHandler
    image.png image.png
    2、Netty提供EventExecutorGroup机制来并行执行ChannelHandler
    image.png image.png

    从上面的图中,可以看出来,Netty提供的的EventExecutorGroup针对的是一个NioEventLoop上的多个客户端channel并发处理,如果是一个客户端channel不断的请求,那么这种并发处理并没有用,服务端还是只有一个线程执行客户端业务。简单的说,就是如果不加EventExecutorGroup,则NioEventLoop中的线程会不断的轮询处理它所管理的客户端channel,加上EventExecutorGroup之后,NioEventLoop就可以把其所管理的客户端channel丢给线程池EventExecutorGroup处理,在EventExecutorGroup线程池中,一个线程处理一个channel的读写操作。

    但是业务自定的ExecutorService针对的是所有的客户端channel业务请求并发处理,就算是一个客户端channel的多个请求,也是可以并发处理的,这种锁竞争会很激烈。

    所以,在使用中,如果是客户端的并发连接数channel多,且每个客户端channel的业务请求阻塞不多,那么使用EventExecutorGroup;
    如果客户端并发连接数channel不多,但是客户端channle的业务请求阻塞较多(复杂业务处理和数据库处理),那么使用ExecutorService

    补充:


    image.png image.png

    2、ChannelHandler并发

    ChannelHandler的一端是Netty NIO线程,另一端则是业务线程池(如上面的业务自定义线程池执行业务channleHandler),那么在多线程并发场景下理解ChannelHandler的并发安全性就很重要了。

    以下面这段代码为例,在多线程环境下loss_connect_time调用安全么?

    public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
        private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
            //非线程安全
        private int loss_connect_time = 0;
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                IdleState state=event.state();
                if (state==IdleState.READER_IDLE) {
                    loss_connect_time++;
                    logger.info(String.valueOf(NettyConstants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"秒没有接收到客户端"+ FactoryMap.getDevNoByChannel((SocketChannel)ctx)+"的信息了");
                    if(loss_connect_time>=NettyConstants.MAX_LOSS_CONNECT_TIME){
                        logger.info("------------服务器主动关闭客户端链路");
                        ctx.channel().close();
                    }
                }
            } else {
                super.userEventTriggered(ctx,evt);
            }
        }
       
    }
    
    // 4、TCP连接通道channel建立时创建ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 为通道进行初始化: 数据传输过来的时候会进行拦截和执行
                            ChannelPipeline  pipeline = sc.pipeline();
                            pipeline.addLast(new ObjectDecoder(1024 * 1024,
                                    ClassResolvers.weakCachingConcurrentResolver(this
                                            .getClass().getClassLoader())));
                            pipeline.addLast(new ObjectEncoder());
                            //每个链路都有自己对应的业务Handler实例,不共享
                            pipeline.addLast(new ServerHandler());
                        }
    

    如果ChannelHandler是非共享的,则它就是线程安全的。
    原因:当链路完成初始化时会创建ChannelPipeline,每个Channel对应一个ChannelPipeline实例,业务的channelHandler会被实例化化并加入ChannelPipeline执行。
    一个channel对应一个channelPipeline,一个channelPipeline上串行执行多个Handler。由于一个Channel只能被特定的NioEventLoop线程执行,也就是说单个NioEventLoop线程串行执行所有channelHandler,
    因此ChannelHandler不会被并发调用,不用考虑线程安全问题。

    image.png
    跨链路共享的ChannelHandler

    如果某个ChannelHandler需要全局共享,则只需要在Handler上添加@ChannelHandler.Sharable注解就可以被添加到多个ChannelPipeline上。

    @ChannelHandler.Sharable
    public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
      //代码略
    }
    

    当channelHandler被添加到多个ChannelPipeline,就会面临多线程并发访问问题,需要ChannelHandler保证自身的线程安全,例如通过原子类,读写锁等方式对数据做并发保护。如果加锁,可能会阻塞NioEventLoop线程,所以@ChannelHandler.Sharable要慎用。

    使用场景

    用户自定义的ChannelHandler有两种场景需要考虑并发安全。

    • 1、通过@ChannelHandler.Sharable注解,多个ChannelPipeline共享的ChannelHandler,它将被多个NioEventLoop线程并发访问。在这种场景下,用户需要保证ChannelHandler共享的合理性,同时需要自己保证它的并发安全性,尽量通过原子类等方式降低锁的开销,防止阻塞NioEventLoop线程。
    image.png
    • 2、ChannelHandler没有共享,但是在用户的ChannelPipeline中的一些ChannelHandler绑定了线程池,这样ChannelPipe的channelHandler就会被异步执行。(第一节中的Netty提供EventExecutorGroup机制来并行执行ChannelHandler
    image.png

    3、内存池

    Netty从4.X引入内存池机制,默认情况下,都是采用内存池模式创建ByteBuf对象,这也是其性能表现超级优异的一个原因之一,我们开发的过程中并不需要去考虑,通常情况下,我们是在编解码(ByteToMessageDecoder)时,从ByteBuf中读取数据到数组,然后进行解码操作,那么我们需要手动释放ByteBuf内存么?不释放会引起OOM么???

    我们以下面一个例子来说明这个问题

    image.png

    在上图这段代码中,涉及到两个ByteBuf,一个是框架分配的(msg),一个是业务中自定义的respMsg,如果出问题后,通常很容易想到是业务自定义byteBuf出问题,业务从内存池中申请了ByteBuf,但是没有主动释放它,看过源码后,可以发现这里并不会出问题,因为在调用ctx.writeAndFlush(respMsg)后,netty框架会主动释放内存。

    那msg为啥会出问题呢,同样看下源码,尤其对比着看下io.netty.channel.SimpleChannelInboundHandler#channelRead的就知道了,继承ChannelInboundHandlerAdapter实现其channelRead方法后,是需要用户自己手动释放内存的。

    所以,解决上面的问题有三种办法,
    第一种,其实通过框架来实现,我们业务Handler实现SimpleChannelInboundHandler<ByteBuf>,在其io.netty.channel.SimpleChannelInboundHandler#channelRead方法中框架有做释放,ReferenceCountUtil.release(msg);

    image.png

    第二种,就是在代码的最后一行添加ReferenceCountUtil.release(msg)来释放ByteBuf

    image.png

    第三种,在业务ChannelInboundHandler中调用ctx.fireChannelRead(msg)方法,让请求消息继续往后执行,直到调用DefaultChannelPipeline的内部类TailContext,由他来负责释放请求消息;io.netty.channel.DefaultChannelPipeline.TailContext#channelRead -> onUnhandledInboundMessage

        protected void onUnhandledInboundMessage(Object msg) {
            try {
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    

    注:像上面图中这样直接使用ctx.writeAndFlush(byteBuf)时,在NettyServerInitializer.initChannel的ChannelPipeline中是不需要再添加解码器Encoder()的,因为这里已经是直接对写入byteBuf了,看源码就知道,在解码器MessageToMessageEncoder中,也是把消息写入byteBuf,
    以源码的ByteArrayEncoder为例,如下

    @Sharable
    public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
        @Override
        protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
            out.add(Unpooled.wrappedBuffer(msg));
        }
    }
    
    

    总结:这个是很多netty初学者最容易出错的地方,网上的资料写的又经常互相矛盾。事实上我们只要按照 解码decoder——业务处理handler——编码encoder 这个流程来处理,基本上不会有内存释放的问题。

    • 解码decoder——根据业务,把byteBuf中数据取出来,然后转换成你想要的数据(对象,或者数组);
    • 业务处理handler(继承SimpleChannelInboundHandler)——直接构建返回对象,或者数据,然后调用ctx.writeAndFlush(respMsg);
    • 编码encoder——通常不需要自定义,可以直接调用框架提供的,比如
      ByteArrayEncoder、StringEncoder;

    当我们有多种协议同时处理时,可以添加多个编码器,也可以不添加,但是要在业务handler处理中,就直接写入byteBuf

    相关文章

      网友评论

          本文标题:Netty高级功能(二):业务处理中线程池和内存池的使用

          本文链接:https://www.haomeiwen.com/subject/dmgdpctx.html