美文网首页netty程序员Java学习笔记
Netty(六):EventLoop 与 EventLoopGr

Netty(六):EventLoop 与 EventLoopGr

作者: 聪明的奇瑞 | 来源:发表于2018-03-19 17:42 被阅读114次

    Neety 是 Reactor 模型的一个实现,那么什么是 Reactor 模型呢?

    关于 Reactor 线程模型

    • Reactor 模型是基于事件驱动的,特别适合处理海量的 I/O 事件
    • 单线程模型
      • 单线程模型指的是所有的 I/O 操作都是在同一个 NIO 线程上面完成,由于 Reactor 模型使用的是 NIO,I/O 操作不会导致阻塞,理论上一个线程可以独立处理所有 I/O 相关的操作
      • 从架构层面看,一个 NIO 线程确实可以完成其承担的职责,例如通过 Acceptor 类接收客户端的 TCP 连接请求,链路建立成功后通过 Dispatch 将对应的 ByteBuffer 派发到指定的 Handler 上进行消息处理并响应客户端
      • 但一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 符合达到 100%,也无法满足海量消息处理。当负荷后处理速度变慢,导致大量客户端连接超时,最终导致大量消息积压和超时。且一旦 NIO 线程发生故障则会导致整个通信模块不可用

    单线程模型
    • 多线程模型
      • 多线程模型与单线程模型最大区别就是有一组 NIO 线程处理 I/O 操作
      • 有专门一个 NIO 线程(Acceptor)用于接收客户端 TCP 连接请求,读写 I/O 操作由一个 NIO 线程池负责
      • 一个 NIO 线程可以同时处理 N 条链路,但一个链路只对应一个 NIO 线程,防止并发操作问题

    多线程模型
    • 主从多线程模型
      • 绝大多数场景下,多线程模型都可以满足性能需求,但是再极个别特殊场景中,一个 NIO 线程处理客户端连接请求可能会存在性能问题,例如百万客户端连接,在这种情况下单独一个 Acceptor 线程可能会存在性能问题,为了解决性能问题,产生了主从多线程模型
      • 它的特点是:服务端用于接收客户端连接的不再是单独一个 NIO 线程,而是一个独立的 NIO 线程池
      • Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 I/O 线程池的某个线程上,由它负责 SocketChannel 的读写和编解码工作
      • Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,由 I/O 线程负责后续的 I/O 操作

    主从线程模型

    你可以先这样理解,EventLoop 是一个线程、EventLoopGroup 本质是一个线程池

    NioEventLoopGroup 与 Reactor 线程模型的对应

    • 上面介绍了三种 Reactor 的线程模型, 那么它们和 NioEventLoopGroup 又有什么关系呢? 其实, 不同的设置 NioEventLoopGroup 的方式就对应了不同的 Reactor 的线程模型
    • 单线程模型
      • 下面代码实例化了一个 NioEventLoopGroup,构造器参数为 1(表示线程池大小)
      EventLoopGroup bossGroup = new NioEventLoopGroup(1);
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup)
       .channel(NioServerSocketChannel.class)
      ...
      
      • 值得注意的是这里只使用了一个 bossGorup,通过查看 ServerBootstrap 重写的 group 方法可以得知,当传入一个 group 时,那么 bossGourp 和 workGourp 就是同一个 NioEventLoopGroup,且这个 NioEventLoopGroup 只有一个线程,这样就会导致 Netty 中的 acceptor 和后续的所有客户端连接的 I/O 操作都是在同一个线程中处理的。就相当于 Reactor 单线程模型
      @Override
      public ServerBootstrap group(EventLoopGroup group) {
          return group(group, group);
      }
      
    • 多线程模型
      • bossGroup 中只有一个线程, 而 workerGroup 中的线程是 CPU 核心数乘以2, 对应 Reactor 多线程模型
      EventLoopGroup bossGroup = new NioEventLoopGroup(1);
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       ...
      
    • 主从多线程模型
      EventLoopGroup bossGroup = new NioEventLoopGroup(4);
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       ...
      
    • 但 Netty 没有使用主从多线程模型,服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费

    NioEventLoopGroup

    NioEventLoopGroup
    • EventLoopGroup(其实是 MultithreadEventExecutorGroup)内部维护一个类型为 EventExecutor 的数组,其大小时 nThreads,这样就构成了一个线程池
    • 如果我们在实例化 NioEventLoopGroup 时指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
    • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例
    • NioEventLoop 属性:
      • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
      • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

    NioEventLoop

    • NioEventLoop 肩负着两种任务:
      • 执行与 Channel 相关的 I/O 操作, 包括 调用 select 等待就绪的 I/O 事件、读写数据与数据的处理等
      • 作为任务队列, 执行 taskQueue 中的任务, 例如调用 eventLoop.schedule 提交的定时任务也是这个线程执行的
    • Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联

    NioEventLoop 类层次结构

    NioEventLoop
    • NioEventLoop 继承链如下:
    NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
    
    • AbstractScheduledEventExecutor 中实现了 NioEventLoop 的 schedule 功能,即我们可以通过 NioEventLoop 实例的 schedule 方法来运行一些定时任务
    • SingleThreadEventExecutor 内部通过 threadFactory.newThread 创建了一个新的 Java 线程,一个 NioEventLoop 和一个特定的线程绑定,且在其生命周期内都不会改变。这个线程中所做的事情主要就是调用其 run() 方法
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;
    
        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // 省略清理代码
                    ...
                }
            }
        });
        threadProperties = new DefaultThreadProperties(thread);
        taskQueue = newTaskQueue();
    }
    
    • SingleThreadEventLoop 实现了任务队列的功能, 通过它我们可以调用 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

    EventLoop 的启动

    • NioEventLoop 本身就是一个 SingleThreadEventExecutor, 因此 NioEventLoop 的启动, 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动
    • 从代码中搜索, thread.start() 被封装到 SingleThreadEventExecutor.startThread() 方法中
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                thread.start();
            }
        }
    }
    
    • 而 startThread 是在 SingleThreadEventExecutor.execute 方法中调用的
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
    
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread(); // 调用 startThread 方法, 启动EventLoop 线程.
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
    • 当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动.

    相关文章

      网友评论

        本文标题:Netty(六):EventLoop 与 EventLoopGr

        本文链接:https://www.haomeiwen.com/subject/cojlqftx.html