美文网首页
4_netty_NioEventLoop

4_netty_NioEventLoop

作者: loading_17 | 来源:发表于2018-06-20 07:46 被阅读0次

    在bind方法的过程中initAndRegister方法内调用了NioEventLoopGroup的register方法。这个方法位于父类MultithreadEventLoopGroup上

        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
        @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
    

    MultithreadEventExecutorGroup类

        @Override
        public EventExecutor next() {
            return chooser.next();
        }
    

    这里前面在NioEventLoopGroup分析过,通过DefaultEventExecutorChooserFactory的内部类PowerOfTwoEventExecutorChooser的next方法从EventExecutor[]内选择NioEventLoop。

    再回到前面的next().register(channel),这里调用了SingleThreadEventLoop的注册方法。

        @Override
        public ChannelFuture register(Channel channel) {
            return register(new DefaultChannelPromise(channel, this));
        }
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    

    这里的promise.channel().unsafe()返回的是AbstarctNioMessageChannel$AbstractUnsafe,调用了其父类AbstractUnsafe的register方法

            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    

    这里会执行eventLoop.execute()方法。

        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    

    如果当前线程没有执行过,则执行startThread()方法。startThread内部会调用doStartThread。在SingleThreadEventExecutor内部会使用ThreadPerTaskExecutor来执行一个任务。任务里面有一行代码

     SingleThreadEventExecutor.this.run();
    

    这里调用的便是子类NioEventLoop的run方法。

    protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    先看switch里面的hasTasks(),
    如果存在任务,那么就会选择selectNow()执行。就是会立即返回当前就绪的IO时间的个数。
    如果里面没有任务, 就会执行select(wakenUp.getAndSet(false))方法。
    注意到里面有个ioRatio,表示到IO处理所占用的时间比。
    接着来看 processSelectedKeys()如何处理。

        private void processSelectedKeys() {
            //如果selectedKey不为空则进入第一个方法
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    通过k.attachment()来获取对象,那么是什么时候放入这个对象?
    也就是在注册channel的时候会执行AbstractChannel$AbstractUnsafe的run方法。register0->AbstractNioChannel类doRegister-> javaChannel().register()
    这里会有 k.attach(att)这么一句代码,这里的att就是当前类NioServerSocketChannel。
    再回过头来看上面处理selectKey的方法。

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
                    unsafe.finishConnect();
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    这里就是处理connect、read、write的地方。
    EventLoop作为线程需要处理IO操作,并且处理线程任务。

    相关文章

      网友评论

          本文标题:4_netty_NioEventLoop

          本文链接:https://www.haomeiwen.com/subject/njfzeftx.html