美文网首页
netty之NioEventLoop事件循环处理

netty之NioEventLoop事件循环处理

作者: hello_kd | 来源:发表于2021-09-30 15:08 被阅读0次

NioEventLoop的事件循环处理,就是在一个死循环中处理IO事件和队列里的任务,并且可以根据策略来平衡这两者之间的执行比例。

protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                   //省略
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
               
            } catch (Throwable t) {
                
            }
        }
    }

首先,先来看下selectStrategy,netty中只有一个默认实现

private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}



@Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
    }

这个策略,若是当前有任务,那么返回selectNow()方法的返回值,若是没有任务,则返回SelectStrategy.SELECT(-1)。

因此接下来的swtich语句块中只会有一种情况,就是值为-1时,表示没有任务。但是并不是就进入无限的阻塞状态select()方法中,还会判断队列是否有定时任务要执行,若有,则计算到下一次定时任务的时间间隔,并传给select()方法中,表示超时时间,这个是为了防止一直在select等待,而没有及时的执行定时任务。

这个超时时间还会设置到原子变量nextWakeupNanos中,这样应用程序就可以通过nextWakeupNanos获取到下一次线程唤醒的时间。当线程唤醒后,程序finally会执行nextWakeupNanos.lazySet(AWAKE),表示线程目前是唤醒状态。这个变量的主要作用是当线程阻塞在select方法时,而此时又有任务提交给这个NioEventLoop执行时

private void execute(Runnable task, boolean immediate) {
    //省略...
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
}

唤醒selector时,会先判断inEventLoop,因为若是inEventLoop,就是目前的任务正在被NioEventLoop的线程执行,并没有阻塞在selector的select方法,还有会对nextWakeupNanos的值设置为AWAKE唤醒状态,若该变量值之前就是唤醒的,那么也不会唤醒selector。

现在,把流程又回到刚刚的事件循环run方法中,当select方法返回后,要执行selectKeys和任务时,会先判断ioRatio这个参数,这个表示的是在当前循环中处理IO事件的时间与任务的比例

//执行任务的时间没有限制
if (ioRatio == 100) {
    try {
        if (strategy > 0) {
            processSelectedKeys();
        }
    } finally {
        // Ensure we always run tasks.
        ranTasks = runAllTasks();
    }
} else if (strategy > 0) {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
            //先计算IO处理的时间,再根据比例计算任务执行的时间
        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
} else {
//若没有IO事件处理,默认执行64个任务
    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

在每次的循环最后,会判断NioEventLoop是否shutdown了,若关闭了,则将Selector上的key都cancel,并关闭channel。

相关文章

网友评论

      本文标题:netty之NioEventLoop事件循环处理

      本文链接:https://www.haomeiwen.com/subject/ljlenltx.html