美文网首页
netty-event-loop

netty-event-loop

作者: 高级Java开发 | 来源:发表于2018-07-10 23:53 被阅读0次

    EventLoop是netty中负责处理Channel的IO事件的对象。从名称可以得知eventLoop是事件循环的意思,当一个Channel注册到一个EventLoop后,eventLoop就会接管这个Channel的IO事件,
    从下方类关系图可以看到EventLoop继承于EventLoopGroup,而EventLoopGroup能够通过next()方法得到一个EventLoop。

    image.png

    今天要分析的是NioEventLoop
    可以看到NioEventLoop继承于SingleThreadEventLoop, SingleThreadEventExecutor, AbstractScheduledEventExecutor, AbstractEventExecutor以及jdk的AbstactExecutorService。

    AbstractEventExecutor
    AbstractEventExecutor override了AbstractExecutorService的newTaskFor方法,这样submit返回的就是自定义的Future了,而netty中使用的Future比jdk原生的Future拥有更多的功能,例如addListener等。

        protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new PromiseTask<T>(this, runnable, value);
        }
    
    

    PromiseTask继承自DefaultPromise实现了RunnableTask, Promise的意思是可以主动write的Future,值得是我们可以手动设置Future的结果、是否完成等。

    另外AbstractEventExecutor中比较常用的一个方法是inEventLoop
    这个是判断当前线程是否在事件循环线程中,因为为了保证一个Channel上的事件处理的线程安全,要把所有的IO事件等使用IO线程来处理,就需要判断当前线程是否是eventLoop线程,如果是则可以直接执行,否则需要提交给eventLoop线程去执行。

        public boolean inEventLoop() {
            return inEventLoop(Thread.currentThread());
        }
    

    inEventLoop(Thread)方法是EventExecutor中定义的抽象方法中,NioEventLoop继承的SingleThreadEventExecutor的实现是直接比较自己的线程,因为SingleThread只有一个线程。

        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }
    

    AbstractEventExecutor中剩余的方法是一些类似shutdown的辅助方法。
    AbstractScheduledEventExecutor继承于AbstractEventExecutor,在其基础上增加了schedule相关的方法。

    保存任务使用的是Heap堆结构的PriorityQueue, 队列中存储的是ScheduledFutureTask.
    deadlineNanos表示任务应该执行的时间点,periodNanos表示是否重复执行,0表示不重复,大于0表示按照固定频率执行,小于0表示按照固定delay 执行。

        private final long id = nextTaskId.getAndIncrement();
        private long deadlineNanos;
        /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
        private final long periodNanos;
    
        private int queueIndex = INDEX_NOT_IN_QUEUE;
    
        @Override
        public int compareTo(Delayed o) {
            if (this == o) {
                return 0;
            }
    
            ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
            long d = deadlineNanos() - that.deadlineNanos();
            if (d < 0) {
                return -1;
            } else if (d > 0) {
                return 1;
            } else if (id < that.id) {
                return -1;
            } else if (id == that.id) {
                throw new Error();
            } else {
                return 1;
            }
        }
    
        @Override
        public void run() {
            assert executor().inEventLoop();
            try {
                if (periodNanos == 0) {
                    if (setUncancellableInternal()) {
                        V result = task.call();
                        setSuccessInternal(result);
                    }
                } else {
                    // check if is done as it may was cancelled
                    if (!isCancelled()) {
                        task.call();
                        if (!executor().isShutdown()) {
                            long p = periodNanos;
                            if (p > 0) {
                                deadlineNanos += p;
                            } else {
                                deadlineNanos = nanoTime() - p;
                            }
                            if (!isCancelled()) {
                                // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                                Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                        ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                                assert scheduledTaskQueue != null;
                                scheduledTaskQueue.add(this);
                            }
                        }
                    }
                }
            } catch (Throwable cause) {
                setFailureInternal(cause);
            }
        }
    
    

    另外AbstractScheduledEventExecutor还包括添加scheduledTask和获取scheduledTask等方法

           protected final Runnable pollScheduledTask(long nanoTime) {
            assert inEventLoop();
    
            Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
            ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
            if (scheduledTask == null) {
                return null;
            }
    
            if (scheduledTask.deadlineNanos() <= nanoTime) {
                scheduledTaskQueue.remove();
                return scheduledTask;
            }
            return null;
        }
    
        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            ObjectUtil.checkNotNull(callable, "callable");
            ObjectUtil.checkNotNull(unit, "unit");
            if (delay < 0) {
                delay = 0;
            }
            validateScheduled0(delay, unit);
    
            return schedule(new ScheduledFutureTask<V>(
                    this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
        }
    
        <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task);
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
    
            return task;
        }
    

    SingleThreadEventExecutor

    SingleThreadEventExecutor继承自AbstractScheduledEventExecutor, 从名字可以看出这个是一个单线程的实现。

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
        private static final int ST_NOT_STARTED = 1;
        private static final int ST_STARTED = 2;
        private static final int ST_SHUTTING_DOWN = 3;
        private static final int ST_SHUTDOWN = 4;
        private static final int ST_TERMINATED = 5;
    
        private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
        private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
                AtomicReferenceFieldUpdater.newUpdater(
                        SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
    
        private volatile ThreadProperties threadProperties;
        private volatile int state = ST_NOT_STARTED;
    

    SingleThreadEventExecutor中定义了一个state变量,表示当前executor的状态,状态有NOT_STARTED, STARTED, SHUTTING_DOWN, SHUTDOWN, TERMINIATED。
    SHUTDOWN是用户调用shutdown完成后进入的状态。
    TERMINATED则是出现异常终止或者任务自然执行完后的状态。
    注意到这里用到了AtomicIntegerFieldUpdaterAtomicReferenceFieldUpdater, 然后搭配volatile变量实现AtomicInteger和AtomicReference的原子操作功能,这样做的优点在于节省内存,因为一个对象需要对象头和一个实际数据存储以及padding等。

    这里定义了一个taskQueue,用于保存从scheduledTaskQueue中取出的已经到期的task。
    thread为这个SingleThreadEventExecutor的线程。
    executor是线程池,对于NioEventLoop是一个ThreadPerTaskExecutor。
    maxPendingTasks控制taskQueue的队列大小。
    rejectedExecutionHandler控制队列满了如何处理。

     private final Queue<Runnable> taskQueue;
    
        private volatile Thread thread;
        @SuppressWarnings("unused")
        private volatile ThreadProperties threadProperties;
        private final Executor executor;
        private volatile boolean interrupted;
    
        private final Semaphore threadLock = new Semaphore(0);
        private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
        private final boolean addTaskWakesUp;
        private final int maxPendingTasks;
        private final RejectedExecutionHandler rejectedExecutionHandler;
    
        private long lastExecutionTime;
    
        @SuppressWarnings({ "FieldMayBeFinal", "unused" })
        private volatile int state = ST_NOT_STARTED;
    
        private volatile long gracefulShutdownQuietPeriod;
        private volatile long gracefulShutdownTimeout;
        private long gracefulShutdownStartTime;
    
        private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
    
    

    runAllTasks会从ScheduledTaskQueue中取出到期的task,尝试放入taskQueue中,如果不成功则放回到scheduledTaskQueue里。
    然后把所有的taskQueue执行完。

         protected boolean runAllTasks() {
            assert inEventLoop();
            boolean fetchedAll;
            boolean ranAtLeastOne = false;
    
            do {
                fetchedAll = fetchFromScheduledTaskQueue();
                if (runAllTasksFrom(taskQueue)) {
                    ranAtLeastOne = true;
                }
            } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
            if (ranAtLeastOne) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
            }
            afterRunningAllTasks();
            return ranAtLeastOne;
        }
    
        private boolean fetchFromScheduledTaskQueue() {
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();
            Runnable scheduledTask  = pollScheduledTask(nanoTime);
            while (scheduledTask != null) {
                if (!taskQueue.offer(scheduledTask)) {
                    // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                    scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                    return false;
                }
                scheduledTask  = pollScheduledTask(nanoTime);
            }
            return true;
        }
    

    NioEventLoop

    NioEventLoop实现了nio相关的操作,将Channel注册到Selector上并且
    通过eventLoop实现多路复用。并且这里通过特殊处理绕过了jdk的nio 循环bug。
    ioRatio是控制io操作占用时间的比例

    private final SelectStrategy selectStrategy;
    
        private volatile int ioRatio = 50;
        private int cancelledKeys;
        private boolean needsToSelectAgain;
    
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    

    前面SingleThreadEventExecutor的留给子类继承的run方法在NioEventLoop实现是

    1. 一个无限循环中
        for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
    

    select处理如下

    1. 记录入口当前时间,记录到最近一个task还有多少时间(如果没有默认一秒)
    2. 在一个for循环中select, 每次select都会有个计数器selectCnt加1.
    3. 如果select返回后发现超时时间没到,则判断selectCnt是否大于阈值了,如果是
      则要rebuildSelect绕过jdk select阻塞失效的bug。
     private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The selector returned prematurely many times in a row.
                        // Rebuild the selector to work around the problem.
                        logger.warn(
                                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                                selectCnt, selector);
    
                        rebuildSelector();
                        selector = this.selector;
    
                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    

    select完成后就是执行处理selectionKey了,遍历SelectionKey集合,从key的attachment判断类型,然后继续Process

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            // check if the set is empty and if so just return to not create garbage by
            // creating a new Iterator every time even if there is nothing to process.
            // See https://github.com/netty/netty/issues/597
            if (selectedKeys.isEmpty()) {
                return;
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
    
                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }
    

    判断如果当前可write,则通过AbstractNioChannel的unsafe 进行flush,如果可读,同样通过unsafe进行read。
    unsafe的read、write操作设计到pipeline相关的代码,在之后的章节中会分析。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    

    通过上面的分析,可以大概理清netty的nio eventLoop过程和实现了。

    相关文章

      网友评论

          本文标题:netty-event-loop

          本文链接:https://www.haomeiwen.com/subject/zlaxpftx.html