1、线程池
在业务channelHandler中,我们有可能会有一些导致同步阻塞的业务处理逻辑,比如数据库操作,同步的调用第三方服务等,这时候,为了提升性能,我们可以采用线程池来提升并发处理能力。
线程池添加策略:
1、业务自定义线程池执行业务channleHandler
image.png image.png2、Netty提供EventExecutorGroup机制来并行执行ChannelHandler
image.png image.png从上面的图中,可以看出来,Netty提供的的EventExecutorGroup针对的是一个NioEventLoop上的多个客户端channel并发处理,如果是一个客户端channel不断的请求,那么这种并发处理并没有用,服务端还是只有一个线程执行客户端业务。简单的说,就是如果不加EventExecutorGroup,则NioEventLoop中的线程会不断的轮询处理它所管理的客户端channel,加上EventExecutorGroup之后,NioEventLoop就可以把其所管理的客户端channel丢给线程池EventExecutorGroup处理,在EventExecutorGroup线程池中,一个线程处理一个channel的读写操作。
但是业务自定的ExecutorService针对的是所有的客户端channel业务请求并发处理,就算是一个客户端channel的多个请求,也是可以并发处理的,这种锁竞争会很激烈。
所以,在使用中,如果是客户端的并发连接数channel多,且每个客户端channel的业务请求阻塞不多,那么使用EventExecutorGroup;
如果客户端并发连接数channel不多,但是客户端channle的业务请求阻塞较多(复杂业务处理和数据库处理),那么使用ExecutorService
补充:
image.png image.png
2、ChannelHandler并发
ChannelHandler的一端是Netty NIO线程,另一端则是业务线程池(如上面的业务自定义线程池执行业务channleHandler),那么在多线程并发场景下理解ChannelHandler的并发安全性就很重要了。
以下面这段代码为例,在多线程环境下loss_connect_time调用安全么?
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
//非线程安全
private int loss_connect_time = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state=event.state();
if (state==IdleState.READER_IDLE) {
loss_connect_time++;
logger.info(String.valueOf(NettyConstants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"秒没有接收到客户端"+ FactoryMap.getDevNoByChannel((SocketChannel)ctx)+"的信息了");
if(loss_connect_time>=NettyConstants.MAX_LOSS_CONNECT_TIME){
logger.info("------------服务器主动关闭客户端链路");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx,evt);
}
}
}
// 4、TCP连接通道channel建立时创建ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 为通道进行初始化: 数据传输过来的时候会进行拦截和执行
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this
.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
//每个链路都有自己对应的业务Handler实例,不共享
pipeline.addLast(new ServerHandler());
}
如果ChannelHandler是非共享的,则它就是线程安全的。
原因:当链路完成初始化时会创建ChannelPipeline,每个Channel对应一个ChannelPipeline实例,业务的channelHandler会被实例化化并加入ChannelPipeline执行。
一个channel对应一个channelPipeline,一个channelPipeline上串行执行多个Handler。由于一个Channel只能被特定的NioEventLoop线程执行,也就是说单个NioEventLoop线程串行执行所有channelHandler,
因此ChannelHandler不会被并发调用,不用考虑线程安全问题。
跨链路共享的ChannelHandler
如果某个ChannelHandler需要全局共享,则只需要在Handler上添加@ChannelHandler.Sharable注解就可以被添加到多个ChannelPipeline上。
@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
//代码略
}
当channelHandler被添加到多个ChannelPipeline,就会面临多线程并发访问问题,需要ChannelHandler保证自身的线程安全,例如通过原子类,读写锁等方式对数据做并发保护。如果加锁,可能会阻塞NioEventLoop线程,所以@ChannelHandler.Sharable要慎用。
使用场景
用户自定义的ChannelHandler有两种场景需要考虑并发安全。
- 1、通过@ChannelHandler.Sharable注解,多个ChannelPipeline共享的ChannelHandler,它将被多个NioEventLoop线程并发访问。在这种场景下,用户需要保证ChannelHandler共享的合理性,同时需要自己保证它的并发安全性,尽量通过原子类等方式降低锁的开销,防止阻塞NioEventLoop线程。
- 2、ChannelHandler没有共享,但是在用户的ChannelPipeline中的一些ChannelHandler绑定了线程池,这样ChannelPipe的channelHandler就会被异步执行。(第一节中的Netty提供EventExecutorGroup机制来并行执行ChannelHandler)
3、内存池
Netty从4.X引入内存池机制,默认情况下,都是采用内存池模式创建ByteBuf对象,这也是其性能表现超级优异的一个原因之一,我们开发的过程中并不需要去考虑,通常情况下,我们是在编解码(ByteToMessageDecoder)时,从ByteBuf中读取数据到数组,然后进行解码操作,那么我们需要手动释放ByteBuf内存么?不释放会引起OOM么???
我们以下面一个例子来说明这个问题
image.png在上图这段代码中,涉及到两个ByteBuf,一个是框架分配的(msg),一个是业务中自定义的respMsg,如果出问题后,通常很容易想到是业务自定义byteBuf出问题,业务从内存池中申请了ByteBuf,但是没有主动释放它,看过源码后,可以发现这里并不会出问题,因为在调用ctx.writeAndFlush(respMsg)后,netty框架会主动释放内存。
那msg为啥会出问题呢,同样看下源码,尤其对比着看下io.netty.channel.SimpleChannelInboundHandler#channelRead的就知道了,继承ChannelInboundHandlerAdapter实现其channelRead方法后,是需要用户自己手动释放内存的。
所以,解决上面的问题有三种办法,
第一种,其实通过框架来实现,我们业务Handler实现SimpleChannelInboundHandler<ByteBuf>,在其io.netty.channel.SimpleChannelInboundHandler#channelRead方法中框架有做释放,ReferenceCountUtil.release(msg);
第二种,就是在代码的最后一行添加ReferenceCountUtil.release(msg)来释放ByteBuf
image.png第三种,在业务ChannelInboundHandler中调用ctx.fireChannelRead(msg)方法,让请求消息继续往后执行,直到调用DefaultChannelPipeline的内部类TailContext,由他来负责释放请求消息;io.netty.channel.DefaultChannelPipeline.TailContext#channelRead -> onUnhandledInboundMessage
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
注:像上面图中这样直接使用ctx.writeAndFlush(byteBuf)时,在NettyServerInitializer.initChannel的ChannelPipeline中是不需要再添加解码器Encoder()的,因为这里已经是直接对写入byteBuf了,看源码就知道,在解码器MessageToMessageEncoder中,也是把消息写入byteBuf,
以源码的ByteArrayEncoder为例,如下
@Sharable
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
out.add(Unpooled.wrappedBuffer(msg));
}
}
总结:这个是很多netty初学者最容易出错的地方,网上的资料写的又经常互相矛盾。事实上我们只要按照 解码decoder——业务处理handler——编码encoder 这个流程来处理,基本上不会有内存释放的问题。
- 解码decoder——根据业务,把byteBuf中数据取出来,然后转换成你想要的数据(对象,或者数组);
- 业务处理handler(继承SimpleChannelInboundHandler)——直接构建返回对象,或者数据,然后调用ctx.writeAndFlush(respMsg);
- 编码encoder——通常不需要自定义,可以直接调用框架提供的,比如
ByteArrayEncoder、StringEncoder;
当我们有多种协议同时处理时,可以添加多个编码器,也可以不添加,但是要在业务handler处理中,就直接写入byteBuf
网友评论