认识Netty

作者: iDevOps | 来源:发表于2019-11-25 14:12 被阅读0次

    简介

    Netty是由Jboss提供的一个异步的、基于事件驱动的Java网络应用框架,用来快速开发高性能、高可靠性的网络IO程序。
    本质上是一个NIO框架,所以想要彻底理解Netty,需要先搞明白什么是NIO

    应用场景

    用于开发RPC框架
    分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架是必不可少的,Netty作为异步高性能呢过的通信框架,往往作为基础通信组件整合到RPC框架中使用。例如阿里的Dubbo就是使用Netty作为基础通信组件。

    游戏行业
    Netty作为高性能的基础通信组件,提供了TCP/UDP和HTTP协议栈,方便定制和开发私有协议栈,可以用来开发账号登陆服务器、地图服务器之间的高性能通信。

    大数据
    Hadoop的高性能通信和序列化组件Avro的RPC框架,默认采用的就是Netty进行跨界点通信,它的Netty Service就是基于Netty框架二次封装实现。

    原生NIO存在的问题

    NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
    需要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写高质量的NIO程序。
    开发工作量和难度都非常大,例如客户端断开重练、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等问题。
    JDK NIO存在bug,例如Epoll Bug,会导致Selector空轮询,最终导致CPU100%,知道JDK1.7版本该问题仍旧存在,没有根本解决。

    Nettty的优点

    Netty对JDK自带的NIO的API进行封装,解决了上述问题。

    设计优雅,适用于各种传输类型的统一API阻塞和非阻塞Socket,基于灵活且可扩展的事件模型,可以清晰地分离关注点,高度可定制的线程模型,单线程或一个或多个线程池。

    高性能、高吞吐量、低延迟、减少资源消耗、最小化不必要的内存复制(零拷贝)

    安全,完整SSL/TLS和StartTLS支持

    社区活跃、发现Bug能及时修复,同时更多新功能会被加入

    Netty版本

    Netty版本分别是netty3.x、netty4.x、netty5.x
    Netty5出现重大bug,已经被官网废弃,目前推荐使用Netty4.x稳定版本

    Netty架构设计

    不同的线程模型,对程序性能有很大影响,目前存在的线程模型有:传统的阻塞I/O服务模型和Reactor模式。Netty线程模型主要基于Reactor多线程模型做了一定的改进。

    传统阻塞I/O模型

    采用阻塞IO模式获取输入的数据,每个连接都需要独立的线程完成数据的输入、业务处理和数据返回。

    问题:
    并发量很大,会创建大量的线程,占用很大系统资源。
    连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。


    传统阻塞IO模型示意图
    Reactor模式

    Reactor模式基本设计思想就是I/O复用结合线程池

    基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。

    基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

    根据Reactor的数量和处理资源线程的数量不同,Reactor模式可以分为3种不同的实现

    单Reactor单线程
    单Reactor多线程
    主从Reactor多线程

    • 单Reactor单线程

    1.Reactor对象通过Select监控客户端请求,收到事件后通过Dispatch进行分发
    2.如果是连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接后的后续业务
    3.如果是处理请求事件,则Reactor分发给对应的Handler来处理
    4.Handler会完成read、业务处理、send等业务流程


    单Reactor单线程示意图

    总结:这种模式,服务端用一个线程通过多路复用搞定了所有的IO操作,代码简单,清晰明了,但是所有的事情都在一个线程上处理,无法发挥多核CPU的性能,Handler在处理某个连接上的业务时,整个进程就无法处理其他连接事件,很容易导致性能瓶颈,还有就是如果线程意外终止,就会进入死循环,会导致整个系统通信不可用。

    • 单Reactor多线程

    1.Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行转发
    2.如果是连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接后的后续业务
    3.如果是处理请求事件,则Reactor分发给对应的Handler来处理
    4.handler只负责响应事件,不做具体的业务处理,通过read读取数据,然后分发给worker线程池的某个线程处理业务
    5.worker线程池会分配独立的线程完成业务处理,并将结果返回给handler
    6.handler收到响应后,通过send将结果返回给客户端


    单Reactor多线程原理示意图

    总结:可以充分利用多核CPU的处理能力,但是多线程之间数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在高并发场景下容易出现性能瓶颈。

    • 主从Reactor多线程

    1.Reactor主线程对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
    2.当Acceptor处理连接事件后,主Reactor将连接分配给子Reactor
    3.子Reactor将连接加入到连接队列进行监听,并创建handler进行各种事件的处理
    4.当有新事件,子Reactor会调用对应的handler进行处理
    5.handler通过read读取数据,分发给后面的worker线程处理
    6.worker线程池分配独立的worker线程进行业务处理,并返回结果
    7.handler收到响应结果后,再通过send将结果返回给client


    主从Reactor多线程原理图

    注:一个Reactor主线程可以对应多个Reactor子线程(这个在图上没有展示)。
    总结:
    优点,主线程和子线程职责明确,主线程只接收连接,子线程完成后续业务操作,主线程和子线程数据交互简单,主线程只需要把新连接传给子线程,子线程无需返回数据。
    缺点,编程复杂度有点高。
    这种模式在很多项目中广泛使用,例如Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型。

    Netty模型

    1.Netty抽象出两个线程池BossGroup和WorkerGroup,BossGroup负责接收客户端的连接,WorkerGroup负责网络的读写,它们的类型都是NioEventLoopGroup。
    2.NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环都是NioEventLoop
    3.NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
    4.NioEventLoopGroup可以有多个线程,即可以有多个NioEventLoop
    5.每个Boss NioEventLoop循环执行3步
    ①轮询accept事件
    ②处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
    ③处理任务队列中的任务,即runAllTasks
    6.每个Worker NioEventLoop执行步骤
    ①轮询read、write事件
    ②处理I/O事件,即read、write事件,在对应的NioSocketChannel上处理
    ③处理任务队列的任务,即runAllTasks
    7.每个Worker NioEventLoop处理业务时,会使用pipeline,pipeline中包含了Channel,通过pipeline可以获取到对应的通道,管道中维护了很多的处理器


    Netty模型原理图

    理论知识就整理到这,下面来个入门案例。

    入门案例

    • 依赖
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.37.Final</version>
    </dependency>
    
    • 服务端
    public class NettyServer {
    
        public static void main(String[] args) throws Exception {
    
            /**
             * 创建两个线程组bossGroup和workerGroup
             * bossGroup只负责连接请求, workerGroup负责处理客户端业务
             * 两个线程组包含的子线程个数: 默认是cpu核数 * 2
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //服务器启动对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                         .channel(NioServerSocketChannel.class)
                         .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到的两个个数
                         .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                         .childHandler(new ChannelInitializer<SocketChannel>() {
                             protected void initChannel(SocketChannel socketChannel) throws Exception {
                                 //给pipeline设置处理器
                                 socketChannel.pipeline().addLast(new NettyServerHandler());
                             }
                         });
                //绑定一个端口并且同步, 生成一个ChannelFuture对象
                ChannelFuture cf = bootstrap.bind(10000).sync();
                cf.channel().closeFuture().sync();
            }finally {
                //优雅的关闭
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * @param ctx 上下文对象
         * @param msg 客户端发送的数据, 默认为Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("服务器线程名: "+Thread.currentThread().getName());
            System.out.println("ctx: "+ctx);
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("客户端消息: "+buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址: "+ctx.channel().remoteAddress());
        }
    
        /**
         * 数据读取完毕
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //将数据写入缓存并刷新
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!!!", CharsetUtil.UTF_8));
        }
    
        /**
         * 处理异常, 一般是需要关闭通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    • 客户端
    public class NettyClient {
    
        public static void main(String[] args) throws Exception{
            //客户端一个事件循环组就可以了
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                //启动类
                Bootstrap bootstrap = new Bootstrap();
                //设置启动参数
                bootstrap.group(group)
                        .channel(NioSocketChannel.class) //设置线程组
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
                            }
                        });
                //启动客户端连接服务器端
                ChannelFuture cf = bootstrap.connect("127.0.0.1", 10000).sync();
                cf.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully();
            }
        }
    }
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 通道就绪触发该方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
        }
    
        /**
         * 当通道有读取事件时, 会触发
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("服务器:" + ctx.channel().remoteAddress() + "  "+ buf.toString(CharsetUtil.UTF_8));
        }
    
        /**
         * 异常处理
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 测试

    分别启动服务端和客户端

    # 服务端输出
    服务器线程名: nioEventLoopGroup-3-1
    ctx: ChannelHandlerContext(NettyServerHandler#0, [id: 0x2f14e16a, L:/127.0.0.1:10000 - R:/127.0.0.1:55613])
    客户端消息: hello server
    客户端地址: /127.0.0.1:55613
    
    # 客户端输出
    服务器:/127.0.0.1:10000  hello, 客户端!!!
    

    相关文章

      网友评论

        本文标题:认识Netty

        本文链接:https://www.haomeiwen.com/subject/vzzfwctx.html