美文网首页
Netty源码分析(二) NioEventLoop

Netty源码分析(二) NioEventLoop

作者: skyguard | 来源:发表于2018-11-07 13:41 被阅读0次

    下面我们来分析一下Netty里的又一个关键组件NioEventLoop。
    其实NioEventLoop就是轮询注册在Selector上的相关事件,在AbstractBootstrap里有一个属性是

     /**
     * EventLoopGroup 对象
    */
     volatile EventLoopGroup group;
    

    在ServerBootStrap里有一个属性是

      /**
     * 子 Channel 的 EventLoopGroup 对象
     */
    private volatile EventLoopGroup childGroup;
    

    NioEventLoopGroup就是线程组,包含了NioEventLoop。
    默认是cpu个数的2倍

     /**
     * 默认 EventLoop 线程数
     */
     private static final int DEFAULT_EVENT_LOOP_THREADS;
    
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    

    下面就是NioEventLoop了,先来看看NioEventLoop有哪些东西,NioEventLoop包含了一个Selector和一个TaskQueue,具体结构为


    NioEventLoop结构图

    来看下NioEventLoop都的类结构图


    NioEventLoop

    看起来好像很复杂的样子,其实最主要的就是SingleThreadEventExecutor和NioEventLoop这2个类,先来看下SingleThreadEventExecutor的runAllTasks方法

     protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false; // 是否执行过任务
    
        do {
            // 从定时任务获得到时间的任务
            fetchedAll = fetchFromScheduledTaskQueue();
            // 执行任务队列中的所有任务
            if (runAllTasksFrom(taskQueue)) {
                // 若有任务执行,则标记为 true
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
        // 如果执行过任务,则设置最后执行时间
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
    
        // 执行所有任务完成的后续方法
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
    

    你可能会问,这只是处理非io任务的,那selector是怎么处理事件的呢。别急,这就要看NioEventLoop这个类了。在NioEventLoop的processSelectedKeys方法处理了io事件,包括connect,accept,read,write,具体为processSelectedKey方法

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 如果 SelectionKey 是不合法的,则关闭 Channel
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        try {
            // 获得就绪的 IO 事件的 ops
            int readyOps = k.readyOps();
    
            // OP_CONNECT 事件就绪
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 移除对 OP_CONNECT 感兴趣
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 完成连接
                unsafe.finishConnect();
            }
    
            // OP_WRITE 事件就绪
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                // 向 Channel 写入数据
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    NioEventLoop还有一个定时任务,为AbstractScheduledEventExecutor,这个类有一个定时任务队列,负责执行定时任务

     /**
     * 定时任务队列
     */
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
    

    最后会执行schedule方法

     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
       
    
        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
    

    然后就是NioEventLoop执行run方法,开始轮询各种io事件了。NioEventLoop的分析就到这里了。

    相关文章

      网友评论

          本文标题:Netty源码分析(二) NioEventLoop

          本文链接:https://www.haomeiwen.com/subject/vkgmxqtx.html