美文网首页IT必备技能netty
Netty——任务加入异步线程池

Netty——任务加入异步线程池

作者: 小波同学 | 来源:发表于2021-03-15 00:00 被阅读0次

前言

我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻塞I/O线程,所以需指定Handler执行的线程池。

如果MyBusinessLogicHandler是一个耗时的处理逻辑,应该制定group,避免I/O线程被阻塞,如果业务逻辑是异步处理或处理时间很快那么可以不用指定group。

在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。
而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2种方式,而且这2种方式实现的区别也蛮大的。

  • 1、处理耗时业务的第一种方式:handler 中加入线程池。
  • 2、处理耗时业务的第二种方式:Context 中添加线程池。

当我们使用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用IO线程。

handler 中加入线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //group充当业务线程池,可以将任务提交到该线程池
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        //如果这里有一个非常耗时长的业务 -> 提交到线程池异步执行
        group.submit(() -> {
            try {
                //模拟耗时长的业务
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 异步执行的线程:"+Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

Context 中添加线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //在addLast添加handler,参数指定了EventExecutorGroup
        //那么该handler会优先加入到线程池中执行
        pipeline.addLast(group,"handler",new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

两种方式的比较

  • 1、第一种方式在handler内部添加EventExecutorGroup,可能更加自由,比如如果需要访问数据库等耗时操作那就异步,如果不需要那就不异步,异步可能会拖长接口响应时间,因为需要将任务放进mpscTask中,如果IO时间很短,Task很多,可能一个循环下来,都没有时间执行整个task,导致接口响应时间不达标。
  • 2、第二种方式是Netty标准方式(即加入到队列),但是这样做会将整个handler都交给业务线程池,不论耗时不耗时,都加入到队列里不够灵活。
  • 3、两种方式各有优劣,第一种灵活性更加,怎么使用,视实际情况而定。

相关文章

  • Netty——任务加入异步线程池

    前言 我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻...

  • Netty 异步线程池处理任务

    前提 在Netty中做耗时的,不可预料的操作,比如数据库的操作,网络请求等,会严重影响Netty对Socket的处...

  • java并发基础-线程池

    线程池主要解决两个问题:当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接...

  • 线程池

    多线程使用:使用注解方式注入线程池进行异步任务,避免手动方式创建线程池

  • AsyncTask的使用及其原理

    概述 Android 已封装好的轻量级异步类。内置一个线程池用于异步任务,另一个线程池用于排队(实际不是线程池)。...

  • [C# 线程处理系列]专题二:线程池中的工作者线程

    目录: 一、上节补充 二、CLR线程池基础 三、通过线程池的工作者线程实现异步 四、使用委托实现异步 五、任务 一...

  • 整整 Java 线程池

    整整 Java 线程池 用官方文档来说,线程池解决了两个问题: 一是在执行大量的异步任务时,因为线程池减少了任务开...

  • 【Java进阶营】整整 Java 线程池

    整整 Java 线程池用官方文档来说,线程池解决了两个问题: 一是在执行大量的异步任务时,因为线程池减少了任务开始...

  • HiExecutor

    全局通用的线程池组件-HiExecutor 支持任务优先级 支持线程池暂停、恢复、关闭 支持异步任务结果回调 Co...

  • 多线程并发编程17-线程池ThreadPoolExecutor源

    今天来说一说线程池ThreadPoolExecutor,线程池主要解决两个问题:一是当执行大量异步任务时线程池...

网友评论

    本文标题:Netty——任务加入异步线程池

    本文链接:https://www.haomeiwen.com/subject/zozqcltx.html