美文网首页Java
牛了Netty之EventLoop线程

牛了Netty之EventLoop线程

作者: 你比天气还晴朗 | 来源:发表于2020-04-16 08:00 被阅读0次

    在使用Netty时,我们老生常谈的是避免在Netty的IO线程中做耗费时间的业务操作和一些长时间的阻塞。本文将通过对Netty中核心类的剖析,帮助读者理解这个问题以及其中的EventLoop设计,从而在使用Netty时更加得心应手。

    用笔者的理解结合EventLoop维基百科的说明,总结如下

    EventLoop是计算机科学中的常用设计模型,其等待事件的就绪并且对事件进行调度分发。EventLoop通常会阻塞等待直到事件就绪。然后调用相关的处理程序。EventLoop通常和Reactor模型一起结合使用。EventLoop并且事件产生源通常和EventLoop不在同一个线程

    EventLoop整个是一个大循环,用伪代码来解释:

    while not shotdown:
        message = get_next_message()
        process_message(message)
    

    EventLoop在计算机系统设计中应用广泛,常用于网络库,例如本文重点介绍的Netty、JS的并发模型也建立在EventLoop之上。

    本文将主要介绍EventLoop在Netty中的实现,帮助大家理解Netty底层的运作机制中核心一环。

    以EventLoop接口中的doc开局,

    Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.
    

    其中说到EventLoop通常为多个channel服务,监听并处理这些注册上来的channel的就绪的IO操作。其实在Netty中,不仅是这些channel的IO操作,包括和channel关联的的定时事件也会交由对应的EventLoop进行执行。这一点,后文会分析到。借用<<Netty In Action>>书中的图片来阐述这种关联关系。

    image

    一旦一个 Channel 被分配给一个 EventLoop,该Channel的整个生命周期中都使用这个EventLoop。这种做法带来最显而易见的好处是避免了多个线程操作同一个Channel和该Channel上资源。从而免除很多线程安全问题,实现局部无锁,在消除了锁带来的性能影响之外,也无需开发人员去关心其上的并发问题,降低了使用网络通讯库的开发门槛。

    Netty中的EventLoop有多种实现,有基于select的NioEventLoop、基于Epoll的EpollEventLoop以及macOS下的KQueueEventLoop等。本文以NioEventLoop举例,该类的继承关系如下:

    WX20180519-161049@2x.png

    SingleThreadEventExecutor类中定义了Thread和taskQueue任务队列。该Thread就是该Eventloop绑定的Thread。通过该Thread执行该EventLoop上的所有Channel的IO操作、任务操作。除此之外,可以看到SingleThreadEventExecutor类还往上继承了AbstractScheduledEventExecutor类,该抽象类中通过优先级队列提供了EventLoop对于定时任务的支持,定时任务典型的典型使用场景就是Nettty原生提供的对于心跳的支持类IdleStateHandler,当然扯远了,这并不是本文的重点,之后会再写篇文章详细解剖该类的打开方式以及原理。

    NioEventLooprun方法是Netty Reactor事件模型的主体,其本质就是一个大循环,如下所示:

     @Override
        protected void run() {
            for (;;) {
                try {
                    // 根据当前的selectStrategy进行判断当前是否要进入select环节或者跳过
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false)); // 阻塞等待
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {  // ioRatio表示执行IO事件的时间比率,如果设置了100,则会优先执行完所有的IO操作,                                      //  再去
                        try {
                            processSelectedKeys(); //处理IO事件
                        } finally {
                            runAllTasks(); //执行非IO事件
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            final long ioTime = System.nanoTime() - ioStartTime; //计算执行IO事件花费的时间
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //根据比例计算执行非IO事件所应该的时间
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                ... 省略
            }
        }
    

    NioEventLoop通过一个线程干了所有Channel的IO事件和定时事件。这就是文章开头说到的为什么Netty中的handler中或者定时事件中不建议运行长时间的业务逻辑。

    作为一个大轮询,当然也不能让他一直没事干也跑着。这会浪费大量的CPU资源。最好的方法就是我没事干的时候就释放CPU资源让CPU去做别的事,有事干的时候就立马能响应过来恢复到一线。select(wakenUp.getAndSet(false));干的就基本是这事。

       private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // ①
    
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // ②
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            // 立即
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) { // ③
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis); // ④
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                       //⑤
                        break;
                    }
                 
                  //省略无关代码
                
            }
    

    标号①:计算selectDeadLineNanos其根据delayNanos计算得到当前EventLoop的最近将要执行的定时任务的时间作为本次select的截止时间。

    标号②:计算当前距离截止时间的毫秒数,其中加上了500000,可以说预留了0.5ms的时间去缓冲其他的代码,从而提升定时任务的实时性。

    标号③:会再次进行检查当前是否有等待执行的任务,此时如果不在select阻塞调用之前判断,则会出现提交的任务长时间得不到执行。

    标号④:调用带timeoutMillis参数的select方法,进行限期等待IO事件的就绪。

    标号⑤:如果满足有就绪的IO事件、select阻塞过程被用户线程唤醒(一般是用户在非本EventLoop线程提交了新的任务,为了避免某个定时任务延迟很长导致timeoutMillis时间过大,允许提交新任务时立即唤醒select)、有待执行的任务、有就绪的定时任务这则会理解跳出本次select,进入正式的IO处理和事件处理流程。

    说完了EventLoop的循环,我们在文章开头处介绍EventLoop的时候,也说到EventLoop线程和提交就绪事件的事件源通常不在一个线程,IO事件的话很好理解,操作系统通过中断机制通知内核进而通知到我们用户空间阻塞在select等上的系统调用,那非IO事件的情况下,Netty是如何处理这些事件的入队和通知EventLoop?

    如下图所示,当程序在channel上调用execute其实是交给了该channel托管的EventLoop线程执行了,当前线程调用线程不同行为也有所不同。如下图所示:

    WX20180519-154158@2x.png

    以在channel上调用write进行写数据为例,当我们调用write或者writeAndFlush时,为了达到串行化无锁操作,Netty在执行方法时会判断当前线程是否就是该channel绑定的EventLoop线程。
    具体代码可以在io.netty.channel.AbstractChannelHandlerContext#write中找到,

      private void write(Object msg, boolean flush, ChannelPromise promise) {
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                final AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                if (!safeExecute(executor, task, promise, m)) {
                    task.cancel();
                }
            }
        }
    
    1. 如果不是,则会继续执行,进行写操作。(虽然这个过程也不一定立刻写到Socket内核缓冲区的)
    2. 如果不是,则会通过safeExecute方法将该write操作当做任务交给EventLoop来处理,safeExecute方法会直接调用NioEventLoop的父类execute方法,将该任务提交到待执行的taskQueue中,并且唤醒EventLoop要求执行。

    这部分代码逻辑在NioEventLoop的父类SingleThreadEventExecutor#execute

     @Override
     public void execute(Runnable task) {
            ....
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) { 
                startThread(); // 如果该EventLoop绑定线程没启动
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                // 该方法会立即调用selector.wakeup()方法
                wakeup(inEventLoop);
            }
     }
    

    startThread会在第一次调用EventLoop的execute方法时进行初始化,通过CAS的方式保证只会初始化一次。
    并且在满足addTaskWakesUp参数为false时(目前只有在BIO的情况下,该参数为true)以及传入的task类型不属于NonWakeupRunnable的情况下,都会去调用wakeup唤醒selector,从而推动EventLoop执行。

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
    

    doStartThread方法中就进行调用了我们EventLoop核心的run方法,从而开启了EventLoop这个精密的机器开始工作。

    总的流程说下来。NioEventLoop作为reactor的核心,其主要工作可以浓缩成如下伪代码。通过一个线程和任务队列在EvenLoop理念下把Netty和NIO结合到极致。

    while True:
        wait_event_ready()         // 等待任意事件就绪,并且能被外界唤醒
        process_io_events(io_events)    // 处理就绪的IO事件
        process_task_and_schedule_task(other_events) //处理该EventLoop托管的一般任务和就绪的定时任务
    

    推荐阅读

    EventLoop是什么 https://www.ruanyifeng.com/blog/2013/10/event_loop.html

    EventLoop维基百科 https://en.wikipedia.org/wiki/Event_loop

    NodeJS EventLoop https://blog.insiderattack.net/event-loop-and-the-big-picture-nodejs-event-loop-part-1-1cb67a182810

    concurrency-vs-event-loop-vs-event-loop-concurrency https://medium.com/@tigranbs/concurrency-vs-event-loop-vs-event-loop-concurrency-eb542ad4067b

    Netty In Action https://livebook.manning.com/book/netty-in-action/chapter-7/74

    zzc-gongzonghao.png

    相关文章

      网友评论

        本文标题:牛了Netty之EventLoop线程

        本文链接:https://www.haomeiwen.com/subject/grwzmhtx.html