美文网首页Java
牛了Netty之EventLoop线程

牛了Netty之EventLoop线程

作者: 你比天气还晴朗 | 来源:发表于2020-04-16 08:00 被阅读0次

在使用Netty时,我们老生常谈的是避免在Netty的IO线程中做耗费时间的业务操作和一些长时间的阻塞。本文将通过对Netty中核心类的剖析,帮助读者理解这个问题以及其中的EventLoop设计,从而在使用Netty时更加得心应手。

用笔者的理解结合EventLoop维基百科的说明,总结如下

EventLoop是计算机科学中的常用设计模型,其等待事件的就绪并且对事件进行调度分发。EventLoop通常会阻塞等待直到事件就绪。然后调用相关的处理程序。EventLoop通常和Reactor模型一起结合使用。EventLoop并且事件产生源通常和EventLoop不在同一个线程

EventLoop整个是一个大循环,用伪代码来解释:

while not shotdown:
    message = get_next_message()
    process_message(message)

EventLoop在计算机系统设计中应用广泛,常用于网络库,例如本文重点介绍的Netty、JS的并发模型也建立在EventLoop之上。

本文将主要介绍EventLoop在Netty中的实现,帮助大家理解Netty底层的运作机制中核心一环。

以EventLoop接口中的doc开局,

Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.

其中说到EventLoop通常为多个channel服务,监听并处理这些注册上来的channel的就绪的IO操作。其实在Netty中,不仅是这些channel的IO操作,包括和channel关联的的定时事件也会交由对应的EventLoop进行执行。这一点,后文会分析到。借用<<Netty In Action>>书中的图片来阐述这种关联关系。

image

一旦一个 Channel 被分配给一个 EventLoop,该Channel的整个生命周期中都使用这个EventLoop。这种做法带来最显而易见的好处是避免了多个线程操作同一个Channel和该Channel上资源。从而免除很多线程安全问题,实现局部无锁,在消除了锁带来的性能影响之外,也无需开发人员去关心其上的并发问题,降低了使用网络通讯库的开发门槛。

Netty中的EventLoop有多种实现,有基于select的NioEventLoop、基于Epoll的EpollEventLoop以及macOS下的KQueueEventLoop等。本文以NioEventLoop举例,该类的继承关系如下:

WX20180519-161049@2x.png

SingleThreadEventExecutor类中定义了Thread和taskQueue任务队列。该Thread就是该Eventloop绑定的Thread。通过该Thread执行该EventLoop上的所有Channel的IO操作、任务操作。除此之外,可以看到SingleThreadEventExecutor类还往上继承了AbstractScheduledEventExecutor类,该抽象类中通过优先级队列提供了EventLoop对于定时任务的支持,定时任务典型的典型使用场景就是Nettty原生提供的对于心跳的支持类IdleStateHandler,当然扯远了,这并不是本文的重点,之后会再写篇文章详细解剖该类的打开方式以及原理。

NioEventLooprun方法是Netty Reactor事件模型的主体,其本质就是一个大循环,如下所示:

 @Override
    protected void run() {
        for (;;) {
            try {
                // 根据当前的selectStrategy进行判断当前是否要进入select环节或者跳过
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false)); // 阻塞等待
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {  // ioRatio表示执行IO事件的时间比率,如果设置了100,则会优先执行完所有的IO操作,                                      //  再去
                    try {
                        processSelectedKeys(); //处理IO事件
                    } finally {
                        runAllTasks(); //执行非IO事件
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime; //计算执行IO事件花费的时间
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //根据比例计算执行非IO事件所应该的时间
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ... 省略
        }
    }

NioEventLoop通过一个线程干了所有Channel的IO事件和定时事件。这就是文章开头说到的为什么Netty中的handler中或者定时事件中不建议运行长时间的业务逻辑。

作为一个大轮询,当然也不能让他一直没事干也跑着。这会浪费大量的CPU资源。最好的方法就是我没事干的时候就释放CPU资源让CPU去做别的事,有事干的时候就立马能响应过来恢复到一线。select(wakenUp.getAndSet(false));干的就基本是这事。

   private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // ①

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // ②
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        // 立即
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                if (hasTasks() && wakenUp.compareAndSet(false, true)) { // ③
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis); // ④
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                   //⑤
                    break;
                }
             
              //省略无关代码
            
        }

标号①:计算selectDeadLineNanos其根据delayNanos计算得到当前EventLoop的最近将要执行的定时任务的时间作为本次select的截止时间。

标号②:计算当前距离截止时间的毫秒数,其中加上了500000,可以说预留了0.5ms的时间去缓冲其他的代码,从而提升定时任务的实时性。

标号③:会再次进行检查当前是否有等待执行的任务,此时如果不在select阻塞调用之前判断,则会出现提交的任务长时间得不到执行。

标号④:调用带timeoutMillis参数的select方法,进行限期等待IO事件的就绪。

标号⑤:如果满足有就绪的IO事件、select阻塞过程被用户线程唤醒(一般是用户在非本EventLoop线程提交了新的任务,为了避免某个定时任务延迟很长导致timeoutMillis时间过大,允许提交新任务时立即唤醒select)、有待执行的任务、有就绪的定时任务这则会理解跳出本次select,进入正式的IO处理和事件处理流程。

说完了EventLoop的循环,我们在文章开头处介绍EventLoop的时候,也说到EventLoop线程和提交就绪事件的事件源通常不在一个线程,IO事件的话很好理解,操作系统通过中断机制通知内核进而通知到我们用户空间阻塞在select等上的系统调用,那非IO事件的情况下,Netty是如何处理这些事件的入队和通知EventLoop?

如下图所示,当程序在channel上调用execute其实是交给了该channel托管的EventLoop线程执行了,当前线程调用线程不同行为也有所不同。如下图所示:

WX20180519-154158@2x.png

以在channel上调用write进行写数据为例,当我们调用write或者writeAndFlush时,为了达到串行化无锁操作,Netty在执行方法时会判断当前线程是否就是该channel绑定的EventLoop线程。
具体代码可以在io.netty.channel.AbstractChannelHandlerContext#write中找到,

  private void write(Object msg, boolean flush, ChannelPromise promise) {
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                task.cancel();
            }
        }
    }
  1. 如果不是,则会继续执行,进行写操作。(虽然这个过程也不一定立刻写到Socket内核缓冲区的)
  2. 如果不是,则会通过safeExecute方法将该write操作当做任务交给EventLoop来处理,safeExecute方法会直接调用NioEventLoop的父类execute方法,将该任务提交到待执行的taskQueue中,并且唤醒EventLoop要求执行。

这部分代码逻辑在NioEventLoop的父类SingleThreadEventExecutor#execute

 @Override
 public void execute(Runnable task) {
        ....
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) { 
            startThread(); // 如果该EventLoop绑定线程没启动
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            // 该方法会立即调用selector.wakeup()方法
            wakeup(inEventLoop);
        }
 }

startThread会在第一次调用EventLoop的execute方法时进行初始化,通过CAS的方式保证只会初始化一次。
并且在满足addTaskWakesUp参数为false时(目前只有在BIO的情况下,该参数为true)以及传入的task类型不属于NonWakeupRunnable的情况下,都会去调用wakeup唤醒selector,从而推动EventLoop执行。

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

doStartThread方法中就进行调用了我们EventLoop核心的run方法,从而开启了EventLoop这个精密的机器开始工作。

总的流程说下来。NioEventLoop作为reactor的核心,其主要工作可以浓缩成如下伪代码。通过一个线程和任务队列在EvenLoop理念下把Netty和NIO结合到极致。

while True:
    wait_event_ready()         // 等待任意事件就绪,并且能被外界唤醒
    process_io_events(io_events)    // 处理就绪的IO事件
    process_task_and_schedule_task(other_events) //处理该EventLoop托管的一般任务和就绪的定时任务

推荐阅读

EventLoop是什么 https://www.ruanyifeng.com/blog/2013/10/event_loop.html

EventLoop维基百科 https://en.wikipedia.org/wiki/Event_loop

NodeJS EventLoop https://blog.insiderattack.net/event-loop-and-the-big-picture-nodejs-event-loop-part-1-1cb67a182810

concurrency-vs-event-loop-vs-event-loop-concurrency https://medium.com/@tigranbs/concurrency-vs-event-loop-vs-event-loop-concurrency-eb542ad4067b

Netty In Action https://livebook.manning.com/book/netty-in-action/chapter-7/74

zzc-gongzonghao.png

相关文章

网友评论

    本文标题:牛了Netty之EventLoop线程

    本文链接:https://www.haomeiwen.com/subject/grwzmhtx.html