netty(七)初识Netty-EventLoop介绍

作者: 我犟不过你 | 来源:发表于2021-11-09 16:00 被阅读0次

    一、什么是Netty?

    Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

    Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

    二、使用Netty的著名组件有哪些?

    • Cassandra - nosql 数据库
    • Spark - 大数据分布式计算框架
    • Hadoop - 大数据分布式存储框架
    • RocketMQ - 阿里巴巴开源的消息队列
    • ElasticSearch - 搜索引擎
    • gRPC - rpc 框架
    • Dubbo - rpc 框架
    • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
    • Zookeeper - 分布式协调框架

    三、简单使用

    下面我们先简单看下如何使用Netty完成hello world。

    在java中使用Netty,必然要引入其依赖。关于Netty的版本变更如下:

    • 2.x 2004
    • 3.x 2008
    • 4.x 2013
    • 5.x 已废弃(没有明显的性能提升,维护成本高)

    如上所示,5.x没有太大的意义,所以我们这里也是用maven4.x的版本,首先在项目中引入maven依赖:

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.52.Final</version>
    </dependency>
    

    服务端代码:

    public class HelloWorldServer {
    
        public static void main(String[] args) {
            new ServerBootstrap()
                    // 1、创建 NioEventLoopGroup,可以简单理解为 `线程池 + Selector`
                    .group(new NioEventLoopGroup())
                    // 2、选择服务 ServerSocketChannel 实现类,这里选择Nio
                    .channel(NioServerSocketChannel.class)
                    // 3、此处是给客户端SocketChannel使用,ChannelInitializer执行一次,待客户端建立连接后,执行initChannel,添加更多的处理器
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            // 5、客户端SocketChannel处理器,解码:ByteBuffer -> String
                            ch.pipeline().addLast(new StringDecoder());
                            // 6、客户端SocketChannel业务处理器,使用上一个处理器的结果
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    // 4、服务端ServerSocketChannel绑定监听端口
                    .bind(8080);
        }
    }
    

    客户端代码:

    public class HelloWorldClient {
    
        public static void main(String[] args) throws InterruptedException {
            new Bootstrap()
                    // 1、创建 NioEventLoopGroup,可以简单理解为 `线程池 + Selector`
                    .group(new NioEventLoopGroup())
                    // 2、选择服务 ServerSocketChannel 实现类,这里选择Nio
                    .channel(NioSocketChannel.class)
                    // 3、此处是给客户端SocketChannel使用,ChannelInitializer执行一次,待客户端建立连接后,执行initChannel,添加更多的处理器
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            // 8、消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 4、指定要连接的服务器端口
                    .connect("127.0.0.1", 8080)
                    // 5、同步方法,等待connect()连接完毕
                    .sync()
                    // 6、获取channel对象,即通道,可读写操作
                    .channel()
                    // 7、写入消息并清空缓冲区
                    .writeAndFlush(new Date() + ": hello world!");
        }
    }
    

    结果:

    Tue Nov 09 10:21:18 CST 2021: hello world!
    

    可以使用以下方式快速理解和记忆:

    • 把 channel 理解为数据的通道
    • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
    • 把 handler 理解为数据的处理工序
      • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成...)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • handler 分 Inbound 和 Outbound 两类
    • 把 eventLoop 理解为处理数据的工人
      • 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
      • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
      • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

    三、主要组件

    3.1 EventLoop

    根据其名称直译叫做“事件循环”对象,其用于处理channel的IO操作。

    EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

    其依赖关系如下:

    image.png

    如上如所示,其继承关系较复杂。

    • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法。
    • 另一条线是继承自 netty 自己的 OrderedEventExecutor。

    3.2 EventLoopGroup

    事件循环组。

    EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

    • 继承自 netty 自己的 EventExecutorGroup
      • 实现了 Iterable 接口提供遍历 EventLoop 的能力
      • 另有 next 方法获取集合中下一个 EventLoop

    3.2.1 常用EventLoopGroup

    主要有以下两种EventLoopGroup:
    1)NioEventLoopGroup:处理IO事件,普通任务,定时任务
    2)DefaultEventLoopGroup:处理普通任务,定时任务

    这里先给出一个结论,EventLoopGroup当中的每一个EventLoop,和客户端Channel实际是绑定的:简单来说,就是一个channel发送的内容,会被同一个线程处理,后面代码会体现。

    3.2.2 EventLoopGroup代码使用示例

    下面以NioEventLoopGroup为例,介绍简单使用:

    3.2.2.1 遍历EventLoopGroup:

    public class TestEventLoopGroup {
    
        public static void main(String[] args) {
            //构造方法可以指定线程数,默认不设置会首先根据Netty的环境变量,否则根据线程核心数*2,最小为1
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(2);
    
            // 使用期next方法 获取内部的EventLoop
            System.out.println(nioEventLoopGroup.next());
            System.out.println(nioEventLoopGroup.next());
            System.out.println(nioEventLoopGroup.next());
    
            System.out.println("-------------------------------------");
    
            // for循环获取内部的EventLoop
            for (EventExecutor group: nioEventLoopGroup) {
                System.out.println(group);
            }
        }
    }
    

    结果:

    io.netty.channel.nio.NioEventLoop@294425a7
    io.netty.channel.nio.NioEventLoop@67d48005
    io.netty.channel.nio.NioEventLoop@294425a7
    -------------------------------------
    io.netty.channel.nio.NioEventLoop@294425a7
    io.netty.channel.nio.NioEventLoop@67d48005
    
    Process finished with exit code 0
    
    

    3.2.2.2 执行普通任务 和 定时任务

    public class TestEventLoopGroup {
    
        public static void main(String[] args) throws InterruptedException {
            //构造方法可以指定线程数,默认不设置会首先根据Netty的环境变量,否则根据线程核心数*2,最小为1
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(2);
    
            // 执行普通任务
            nioEventLoopGroup.next().execute(TestEventLoopGroup::print);
    
            // 执行定时任务,延后一秒打印
            System.out.println(new Date());
            nioEventLoopGroup.next().schedule(TestEventLoopGroup::print, 1000, TimeUnit.MILLISECONDS);
    
        }
    
        private static void print() {
            System.out.println(new Date() + " " +Thread.currentThread());
        }
    }
    

    结果:

    Tue Nov 09 14:45:50 CST 2021
    Tue Nov 09 14:45:50 CST 2021 Thread[nioEventLoopGroup-2-1,10,main]
    Tue Nov 09 14:45:51 CST 2021 Thread[nioEventLoopGroup-2-2,10,main]
    

    3.2.2.3 执行IO任务:

    其实所谓IO任务就是,客户端和服务端的通信,在此例子的基础上,我们在添加一个职责划分的概念。

    1)职责划分1
    何为职责划分?就是在我们创建EventLoopGroup时,指定两个,不使用单一的一个,让职责更加明确,其构造方法如下所示:

        /**
          * 为父级(接受者)和子级(客户端)设置EventLoopGroup 。
          */
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
            return this;
        }
    

    在上面的构造当中,第一个参数负责ServerSocketChannel的accept操作,而第二个参数负责SocketChannel的读写。

    关于第一个参数是否需要指定线程数据呢?实际只需要指定为1即可,因为ServerSocketChannel只有一个,只会绑定一个EventLoop。

    第二个参数是工作线程,根据实际工作需要设置,默认不设置会首先根据Netty的环境变量,否则根据线程核心数*2,最小为1。

    下面进行代码的演示,模拟两个EventLoop处理事件。

    根据前面的分析,这里我们设置两个EventLoopGroup,第一个给1或者不设置,第二个需要注意了,如果给1的话,那表示只会有一个线程在进行SocketChannel的读写操作,并不是两个线程同时操作,所以下面我们给第二个参数的线程数设置为2。

    服务端代码:

    public class EventLoopGroupServer {
    
        public static void main(String[] args) throws InterruptedException {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + ": " + s);
                                }
                            });
                        }
                    }).bind(8080).sync();
        }
    }
    

    分别依次启动三个客户端,每个客户端发送两个不同的字符串消息(分别是aaa,bbb,ccc),下面只提供aaa,看结果。
    客户端代码:

    public class EventLoopGroupClient {
    
        public static void main(String[] args) throws InterruptedException {
            Channel channel = new Bootstrap()
                    .group(new NioEventLoopGroup(1))
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            System.out.println("init...");
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .channel(NioSocketChannel.class).connect("localhost", 8080)
                    .sync()
                    .channel();
    
            channel.writeAndFlush("aaa");
            Thread.sleep(1000);
            channel.writeAndFlush("aaa");
        }
    }
    

    结果:

    nioEventLoopGroup-3-1: aaa
    nioEventLoopGroup-3-1: aaa
    nioEventLoopGroup-3-2: bbb
    nioEventLoopGroup-3-2: bbb
    nioEventLoopGroup-3-1: ccc
    nioEventLoopGroup-3-1: ccc
    

    根据结果我们得到结论:

    客户端SocketChannel会和EventLoop进行绑定,后面发送的消息,依然由其处理。
    下一个客户端连接后,会默认轮询到下一个EventLoop。
    第三个客户端来的时候,又会连接第一EventLoop,其内部是多路复用,一个EventLoop管理多个channel。

    2)职责划分2
    前面讲了一种职责划分,是在ServerSocketChannel和SocketChannel的划分。

    针对SocketChannel还可以进一步的划分,实现方式就是我们可以指定另外一个EventLoopGroup,具体如下所示:

    public class EventLoopGroupServer {
    
        public static void main(String[] args) throws InterruptedException {
            // 定义一个defaultEventLoopGroup,对职责进一步划分
            DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
    
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            // 解码处理器
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + ": " + s);
                                    // 要想使下了一个处理器,能够收到此处理器的结果,需要使用西面这个方法传递
                                    channelHandlerContext.fireChannelRead(s);
                                }
                                // 添加另一个处理器,使用额外的EventLoopGroup
                            }).addLast(defaultEventLoopGroup,"otherHandler",new SimpleChannelInboundHandler<String>(){
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + ": " + s);
                                }
                            });
                        }
                    }).bind(8080).sync();
        }
    }
    

    注意使用 channelHandlerContext.fireChannelRead(s)在处理器传递。

    客户端随便发送个消息,我们看看两个处理器的打印内容:

    nioEventLoopGroup-4-2: Tue Nov 09 15:44:47 CST 2021: hello world!
    defaultEventLoopGroup-2-2: Tue Nov 09 15:44:47 CST 2021: hello world!
    

    连个处理内容分别通过不同的EventLoop处理,分别是原本的nio和后创建的default。

    并且,同一个客户端再次发送内容,此处两个线程仍然会是和channel进行绑定。不具体演示了。

    我们最后看下上述的两个处理器如何切换不同的线程处理的?

    通过 channelHandlerContext.fireChannelRead(s)向下跟踪,到以下代码处:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    
        // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程,此处就是EventLoop
        EventExecutor executor = next.executor();
        
        // 是,直接调用
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } 
        // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
        else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    

    看到这了,有帮助的话帮忙点个赞吧~~

    相关文章

      网友评论

        本文标题:netty(七)初识Netty-EventLoop介绍

        本文链接:https://www.haomeiwen.com/subject/qjdmzltx.html