美文网首页
netty系列之(三)——EventLoop和EventLoop

netty系列之(三)——EventLoop和EventLoop

作者: 康康不遛猫 | 来源:发表于2018-12-28 18:21 被阅读0次

    一、线程模型

    Netty的线程模型

    Q

    1、默认情况下netty服务端起多少个线程?何时启动?
    2、netty如何解决jdk空轮询的bug?
    3、netty如何保证异步串行无锁化?

    过程

    1、NioEventLoop创建
    2、NioEventLoop启动
    3、NioEventLoop执行逻辑

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();//用于处理I/O相关的读写操作,或者执行Task
    

    服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。一个用于接收客户端的TCP连接,另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
    Netty用于接收客户端请求的线程池职责如下。

    • 接收客户端TCP连接,初始化Channel参数;
    • 将链路状态变更事件通知给ChannelPipeline。

    Netty处理I/O操作的Reactor线程池职责如下。

    • 异步读取通信对端的数据报,发送读事件到ChannelPipeline;
    • 异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;
    • 执行系统调用Task;
    • 执行定时任务Task,例如链路空闲状态监测定时任务。

    为了尽可能地提升性能,Netty在很多地方进行了无锁化的设计,例如在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列—多个工作线程的模型性能更优。
    Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead (Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
    netty线程使用建议:

    (1)创建两个NioEventLoopGroup,用于逻辑隔离NIO acceptor和NIO I/O线程。
    (2)尽量不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)。
    (3)解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码。
    (4)如果业务逻辑操作非常简单,没有复杂的业务逻辑计算,没有可能会导致线程被阻塞的磁盘操作、数据库操作、网路操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换
    (5)如果业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他的I/O操作。推荐的线程数量计算公式有以下两种。
    •公式一:线程数量=(线程总时间/瓶颈资源时间)×瓶颈资源的线程并行数。
    •公式二:QPS=1000/线程总时间×线程数。

    二、NioEventLoop源码解析

    NioEventLoop类图.png 图片.png

    1、NioEventLoop创建

    NioEventLoop创建步骤:

    • new ThreadPerTaskExecutor()[线程创建器]:线程执行器的作用是负责创建NioEventLoopGroup对应底层线程
    • for(){newChild()}[构造NioEventLoop]:创建NioEventLoop对象数组,for循环创建每个NioEventLoop,调用newChild()配置NioEventLoop核心参数
    • chooserFactory.newChooser()[线程选择器]:给每个新连接分配NioEventLoop线程
      chooserFactory.newChooser
      PowerOfTwoEventExecutorChooser.next
      (1)、NioEventLoopGroup与ThreadPerTaskExecutor线程创建器
    public NioEventLoopGroup() {
            this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
    }
    
    //线程组默认线程数为2倍的cpu数
    //DEFAULT_EVENT_LOOP_THREADS=2*Runtime.getRuntime().availableProcessors()
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    #MultithreadEventExecutorGroup类,核心方法
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                //线程创建器,负责创建NioEventLoopGroup对应底层线程
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];//创建NioEventLoop对象数组
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    //for循环创建每个NioEventLoop,调用newChild(),配置NioEventLoop核心参数
                    children[i] = newChild(executor, args);//newChild
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
            //线程选择器,给每个新连接分配NioEventLoop线程
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
      }
        
    #使用threadFactory创建线程
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
          //每次执行任务创建一个线程,newDefaultThreadFactory定义了nioEventLoop-1-xx的线程名
            threadFactory.newThread(command).start();
        }
    }
    

    (2)、newChild():创建NioEventLoop线程

    • 保持线程执行器ThreadPerTaskExecutor;
    • 创建一个MpscQueue:taskQueue用于外部线程执行Netty任务的时候,如果判断不是在NioEventLoop对应线程里面执行,而直接塞到任务队列里面,由NioEventLoop对应线程执行,
      PlatformDependent.newMpscQueue(maxPendingTasks)创建MpscQueue保存异步任务队列;
    • 创建一个selector:provider.openSelector()创建selector轮询初始化连接
    #NioEventLoopGroup
    @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    
        
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);//父类构造函数
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
        
    #SingleThreadEventExecutor类,父类构造函数
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = Math.max(16, maxPendingTasks);
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            taskQueue = newTaskQueue(this.maxPendingTasks);//task队列,外部线程将任务扔进队列
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
        
    //task queue
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            // This event loop never calls takeTask()
            return PlatformDependent.newMpscQueue(maxPendingTasks);
    }
        
    private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//用数组实现set
    
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
            if (!(maybeSelectorImplClass instanceof Class) ||
                    // ensure the current selector implementation is what we can instrument.
                    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }
                return new SelectorTuple(unwrappedSelector);
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            //通过反射方式设置selectedKeySet
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                        if (cause != null) {
                            return cause;
                        }
    
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
        
    

    (3)、 chooserFactory.newChooser()[线程选择器]:给每个新连接均衡分配NioEventLoop线程

    #DefaultEventExecutorChooserFactory 类
    public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    
        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    
        private DefaultEventExecutorChooserFactory() { }
    
        @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        //判断长度是否是2的幂,是则使用PowerOfTwoEventExecutorChooser,更高效
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];//&比取模高效,循环下标
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];//取模
            }
        }
    }
    

    2、NioEventLoop启动流程

    NioEventLoop启动流程步骤:

    • bind->execute(task)[入口]:调用NioEventLoop的execute()方法执行绑定端口,操作封装的Task
    • startThread()->doStartThread()[创建线程]:非NioEventLoop线程调用startThread()方法,创建启动线程
    • ThreadPerTaskExecutor.execute():线程执行器执行任务,创建并启动FastThreadLocalThread线程
    • NioEventLoop.run()[启动]
    #AbstractBootstrap类doBind方法
    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
    #AbstractBootstrap类doBind0方法
    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            //调用SingleThreadEventExecutor中execute方法
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
        
    #SingleThreadEventExecutor中execute方法
    public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();//判断是否当前eventloop中
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();//创建线程
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    
        public boolean inEventLoop() {
            return inEventLoop(Thread.currentThread());
        }
    
        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }
        
        private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    
    //启动线程  
    private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();//保存当前线程,用于判断
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();//触发NioEventLoop执行
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    

    3、NioEventLoop执行过程

    NioEventLoop.run()->SingleThreadEventExecutor.this.run():
    run()->for(;;)

    • select()[检查是否有io事件]:轮询注册到selector上面的io事件
    • processSelectedKeys()[处理io事件]
    • runAllTasks()[处理异步任务队列]:处理外部线程扔到TaskQueue里面的任务
      (1)、NioEventLoop的run方法
    #NioEventLoop的run方法
    protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));//wakenUp.getAndSet(false)取值,并设置为false。
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
    //ioRatio控制processSelectedKeys(处理IO)和runAllTasks时间占有率,
    //runAllTasks处理外部线程扔到taskQueue的任务。默认50,即时间1:1
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
        
    

    (2)、select()方法执行逻辑(检测IO事件):

    • deadline以及任务穿插逻辑处理:计算本次执行select截止时间(根据NioEventLoop当时是否有定时任务处理)以及判断在select的时候是否有任务要处理
    • 阻塞式select:未到截止时间或者任务队列为空进行一次阻塞式select操作
    • 避免JDK空轮询的Bug:判断这次select操作是否阻塞timeoutMillis时间,未阻塞timeoutMillis时间表示触发JDK空轮询;判断触发JDK空轮询的次数是否超过阈值,达到阈值调用rebuildSelector()方法替换原来的selector操作方式避免下次JDK空轮询继续发生
    private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//截止时间
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {//taskQueue是否有,wakenUp设置为true
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
                    //未到截止时间,且当前taskQueue无任务,进行阻塞式的select
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;//轮询selectCnt次数++
                    //轮询到IO事件,或当前操作是否需要唤醒,或被唤醒,或异步队列有任务,或者有定时任务。则退出循环
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
                    //防止死循环
                    long time = System.nanoTime();
                    //time-currentTimeNanos>超时时间,说明已经进行了一次阻塞式
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        //没超过时间说明有空轮询,当空轮询超过SELECTOR_AUTO_REBUILD_THRESHOLD次数
                        // The selector returned prematurely many times in a row.
                        // Rebuild the selector to work around the problem.
                        logger.warn(
                                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                                selectCnt, selector);
    
                        rebuildSelector();//重新创建selector
                        selector = this.selector;
    
                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
        
        
        protected boolean hasTasks() {
            assert inEventLoop();
            return !taskQueue.isEmpty();
        }
        
        /**解决java nio 空轮询bug
         * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
         * around the infamous epoll 100% CPU bug.
         */
        public void rebuildSelector() {
            if (!inEventLoop()) {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        rebuildSelector0();
                    }
                });
                return;
            }
            rebuildSelector0();
        }
    
        private void rebuildSelector0() {
            final Selector oldSelector = selector;
            final SelectorTuple newSelectorTuple;
    
            if (oldSelector == null) {
                return;
            }
    
            try {
                newSelectorTuple = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
    
            // Register all channels to the new Selector.
            int nChannels = 0;
            for (SelectionKey key: oldSelector.keys()) {//重新注册到新的Selector
                Object a = key.attachment();
                try {
                    if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                        continue;
                    }
    
                    int interestOps = key.interestOps();
                    key.cancel();
                    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                        // Update SelectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
                    logger.warn("Failed to re-register a Channel to the new Selector.", e);
                    if (a instanceof AbstractNioChannel) {
                        AbstractNioChannel ch = (AbstractNioChannel) a;
                        ch.unsafe().close(ch.unsafe().voidPromise());
                    } else {
                        @SuppressWarnings("unchecked")
                        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                        invokeChannelUnregistered(task, key, e);
                    }
                }
            }
    
            selector = newSelectorTuple.selector;
            unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
            try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
    
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    

    (3)、IO事件处理:processSelectedKeys()执行逻辑:

    • selected keySet优化:select操作每次把已就绪状态的io事件添加到底层HashSet(时间复杂度为O(n))数据结构,通过反射方式将HashSet替换成数组的实现
    • processSelectedKeysOptimized():
      调用SelectedKeys的flip()方法获取SelectionKey数组;
      遍历SelectionKey数组获取SelectionKey的attachment即NioChannel;
      SelectionKey合法获取SelectionKey的io事件进行事件处理
    private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {//不需要优化,返回原生的Selector(默认进行优化)
                return new SelectorTuple(unwrappedSelector);
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
            
            if (!(maybeSelectorImplClass instanceof Class) ||
                    // ensure the current selector implementation is what we can instrument.
                    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }
                return new SelectorTuple(unwrappedSelector);
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            //通过反射方式
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                        if (cause != null) {
                            return cause;
                        }
    
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);//反射方式进行赋值
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
        
        
    #使用SelectedSelectionKeySet代替hashset,采用数组实现,add方法
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;// O(1),而hashset的add为o(n)
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
    
        @Override
        public int size() {
            return size;
        }
    
        @Override
        public boolean remove(Object o) {
            return false;
        }
    
        @Override
        public boolean contains(Object o) {
            return false;
        }
    
        @Override
        public Iterator<SelectionKey> iterator() {
            throw new UnsupportedOperationException();
        }
    
        void reset() {
            reset(0);
        }
    
        void reset(int start) {
            Arrays.fill(keys, start, size, null);
            size = 0;
        }
    
        private void increaseCapacity() {
            SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
            System.arraycopy(keys, 0, newKeys, 0, size);
            keys = newKeys;
        }
    }
    
    
    //NioEventLoop的run方法调用processSelectedKeys
    private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
    #处理IO事件
    private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {//遍历SelectedSelectionKeySet数组
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();//attachment即NioChannel
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);//处理io事件
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }   
    

    (4)、tash执行,runAllTasks()执行逻辑:

    • Task的分类和添加:分为普通任务Task和定时任务Task
      MpscQueue创建NioEventLoop构造,外部线程使用addTask()方法添加task;
      ScheduledTaskQueue调用schedule()封装ScheduledFutureTask添加到普通任务队列
    • 任务的聚合->fetchFromScheduledTaskQueue():将定时任务队列任务聚合到普通任务队列
    • 任务的执行:获取普通任务队列待执行任务,使用safeExecute()方法执行任务,每次当累计任务数量达到64判断当前时间是否超过截止时间中断执行后续任务
    #NioEventLoop构造函数中,创建taskQueue
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    
    #父类构造方法
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = Math.max(16, maxPendingTasks);
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            taskQueue = newTaskQueue(this.maxPendingTasks);//taskQueue 
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    
    #NioEventLoop
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            // This event loop never calls takeTask()
            return PlatformDependent.newMpscQueue(maxPendingTasks);
    }
    
    //普通任务队列添加
    #SingleThreadEventExecutor
    public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
        
    #SingleThreadEventExecutor
    protected void addTask(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            if (!offerTask(task)) {
                reject(task);
            }
        }
    
        final boolean offerTask(Runnable task) {
            if (isShutdown()) {
                reject();
            }
            return taskQueue.offer(task);
        }
        
    //定时任务队列
    #AbstractScheduledEventExecutor 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            ObjectUtil.checkNotNull(callable, "callable");
            ObjectUtil.checkNotNull(unit, "unit");
            if (delay < 0) {
                delay = 0;
            }
            return schedule(new ScheduledFutureTask<V>(
                    this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
        }
        
    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {//是在在当前EventLoop,否则外部线程执行
                scheduledTaskQueue().add(task);
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
    
            return task;
        }
    
    //非线程安全的
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
            if (scheduledTaskQueue == null) {
                scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
            }
            return scheduledTaskQueue;
        }
        
    #runAllTasks
    protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();//从定时任务队列里拉取第一个任务(根据截止时间排序优先级)
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
                 //每64个task检查一次deadline
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();//
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
        
    //从定时任务队列取任务    
    private boolean fetchFromScheduledTaskQueue() {
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();
            Runnable scheduledTask  = pollScheduledTask(nanoTime);//取截止时间为nanoTime的任务
            while (scheduledTask != null) {
                if (!taskQueue.offer(scheduledTask)) {
                    // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                    scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                    return false;
                }
                scheduledTask  = pollScheduledTask(nanoTime);
            }
            return true;
        }
        
    #AbstractScheduledEventExecutor
    protected final Runnable pollScheduledTask(long nanoTime) {
            assert inEventLoop();
    
            Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
            //取第一个任务
            ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
            if (scheduledTask == null) {
                return null;
            }
    
            if (scheduledTask.deadlineNanos() <= nanoTime) {
                scheduledTaskQueue.remove();//取小于等于nanoTime的
                return scheduledTask;
            }
            return null;//无符合nanoTime的任务
        }
        
        
    //执行任务
    #SingleThreadEventExecutor
    protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();//取任务
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//计算截止时间
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
    //累计任务数量达到64判断当前时间是否超过截止时间中断执行后续任务,因为ScheduledFutureTask.nanoTime()比较耗时,不每次判断
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {//没有任务退出
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
        
        protected Runnable pollTask() {
            assert inEventLoop();
            return pollTaskFrom(taskQueue);
        }
    

    相关文章

      网友评论

          本文标题:netty系列之(三)——EventLoop和EventLoop

          本文链接:https://www.haomeiwen.com/subject/hrmemftx.html