美文网首页IT必备技能netty
Netty——任务加入异步线程池

Netty——任务加入异步线程池

作者: 小波同学 | 来源:发表于2021-03-15 00:00 被阅读0次

    前言

    我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻塞I/O线程,所以需指定Handler执行的线程池。

    如果MyBusinessLogicHandler是一个耗时的处理逻辑,应该制定group,避免I/O线程被阻塞,如果业务逻辑是异步处理或处理时间很快那么可以不用指定group。

    在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。
    而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2种方式,而且这2种方式实现的区别也蛮大的。

    • 1、处理耗时业务的第一种方式:handler 中加入线程池。
    • 2、处理耗时业务的第二种方式:Context 中添加线程池。

    当我们使用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用IO线程。

    handler 中加入线程池

    服务端的实现

    public class NettyServer {
    
        public static void main(String[] args) {
            //创建bossGroup 和 workerGroup
            //创建两个线程组,bossGroup和workerGroup
            //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
            //两个都是无限循环
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //使用链式编程来进行设置
                bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                        .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                        .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                        //给workerGroup的EventLoop对应的管道设置处理器
                        .childHandler(new NettyServerInitializer());
                System.out.println("......服务器 id ready...");
                //绑定一个端口并同步,生成一个ChannelFuture对象
                ChannelFuture channelFuture = bootstrap.bind(6668).sync();
    
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    

    服务端Initializer

    public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new NettyServerHandler());
        }
    }
    

    服务端Handler

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        //group充当业务线程池,可以将任务提交到该线程池
        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
    
        /**
         * 读取数据事件(读取客户端发送的消息)
         * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
         * @param msg   客户端发送的数据,默认Object
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:"+ctx.channel().remoteAddress());
            //如果这里有一个非常耗时长的业务 -> 提交到线程池异步执行
            group.submit(() -> {
                try {
                    //模拟耗时长的业务
                    Thread.sleep(10 * 1000);
                    System.out.println("group.submit 异步执行的线程:"+Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            System.out.println("......go on ......");
        }
    
        /**
         * 数据读取完毕
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常,需要关闭通道
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
    

    Context 中添加线程池

    服务端的实现

    public class NettyServer {
    
        public static void main(String[] args) {
            //创建bossGroup 和 workerGroup
            //创建两个线程组,bossGroup和workerGroup
            //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
            //两个都是无限循环
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //使用链式编程来进行设置
                bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                        .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                        .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                        //给workerGroup的EventLoop对应的管道设置处理器
                        .childHandler(new NettyServerInitializer());
                System.out.println("......服务器 id ready...");
                //绑定一个端口并同步,生成一个ChannelFuture对象
                ChannelFuture channelFuture = bootstrap.bind(6668).sync();
    
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    

    服务端Initializer

    public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    
        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            //在addLast添加handler,参数指定了EventExecutorGroup
            //那么该handler会优先加入到线程池中执行
            pipeline.addLast(group,"handler",new NettyServerHandler());
        }
    }
    

    服务端Handler

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 读取数据事件(读取客户端发送的消息)
         * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
         * @param msg   客户端发送的数据,默认Object
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:"+ctx.channel().remoteAddress());
            System.out.println("......go on ......");
        }
    
        /**
         * 数据读取完毕
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常,需要关闭通道
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
    

    两种方式的比较

    • 1、第一种方式在handler内部添加EventExecutorGroup,可能更加自由,比如如果需要访问数据库等耗时操作那就异步,如果不需要那就不异步,异步可能会拖长接口响应时间,因为需要将任务放进mpscTask中,如果IO时间很短,Task很多,可能一个循环下来,都没有时间执行整个task,导致接口响应时间不达标。
    • 2、第二种方式是Netty标准方式(即加入到队列),但是这样做会将整个handler都交给业务线程池,不论耗时不耗时,都加入到队列里不够灵活。
    • 3、两种方式各有优劣,第一种灵活性更加,怎么使用,视实际情况而定。

    相关文章

      网友评论

        本文标题:Netty——任务加入异步线程池

        本文链接:https://www.haomeiwen.com/subject/zozqcltx.html