美文网首页
Netty源码分析----服务启动之开始接收请求及其他细节

Netty源码分析----服务启动之开始接收请求及其他细节

作者: _六道木 | 来源:发表于2018-05-28 00:04 被阅读26次

    (*文章基于Netty4.1.22版本)
    在上一篇文章Netty源码分析----服务启动之Channel初始化中,一开始给出了一个NIO的demo,然后从构造方法开始分析Netty对应的封装操作流程,而这篇文章,而这篇文章会开始分析,当初始化完成之后,Netty是如何开始接收请求的。
    先看下上一篇文章NIO的demo中,是如何接收请求的(只保留run方法,其他忽略)

    public class NioServer implements Runnable {
            //....
        public void run() {
            while (true) {
                try {
                    // 无论是否有事件发送,selector每隔1s被唤醒一次
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    SelectionKey selectionKey = null;
                    while (iterator.hasNext()) {
                        //....SelectionKey处理
                    }
                } catch (IOException e) {
                }
            }
        }
    }
    

    源码分析

    Netty如何开始接收请求

    那么这次需要看下Netty这部分代码在哪里,且是如何触发的。回顾一下,在Netty源码分析----服务启动之Channel初始化中分析到,Channel会注册到Selector中,而这个Selector是在EventLoop中初始化的,那么也就是说,Selector对Channel的选择应该是在EventLoop中的,由于我们使用的是NioEventLoopGroup,所以创建的EventLoop是NioEventLoop,那么到NioEventLoop中看下,核心代码是run方法:

        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    在其中我们也能看到NIO使用Selector去获取SelectionKey的影子,但这篇文章不会去详细分析run的逻辑,这个会在以后再进行分析,这里只分析启动的时候触发的流程。

    从bind方法开始,有些方法会调用channel.eventLoop().execute这个方法,该方法实际调用的是SingleThreadEventExecutor的execute方法,看下这个的逻辑

        public void execute(Runnable task) {
            //....
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    

    一开始inEventLoop为true,那么执行startThread和startTask方法,看下实现:

        private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    try {
                        doStartThread();
                    } catch (Throwable cause) {
                       //....
                    }
                }
            }
        }
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } finally {
                        //....
                    }
                }
            });
        }
    

    使用线程池执行了一个异步的任务,首先会先设置thread = Thread.currentThread()(inEventLoop方法就是比较thread属性是否等于当前线程,由于一开始空,所以为false,还有其他的情况就是当前线程和EventLoop的线程是否同一个),然后调用了run方法,那么这时候就是执行的NioEventLoop的run方法了。
    再看下addTask的实现

        protected void addTask(Runnable task) {
            //....
            if (!offerTask(task)) {
                reject(task);
            }
        }
        final boolean offerTask(Runnable task) {
            if (isShutdown()) {
                reject();
            }
            return taskQueue.offer(task);
        }
    

    addTask就是将任务放到队列中,顺便的说一下在NioEventLoop的方法里,会分开两部分时间,有一部分时间执行io任务,就是处理连接,read,write等事件,一部分就是处理自定义的任务,就是通过EventLoop的execute方法加入的任务,也就是队列里的任务。
    整个流程图如下:


    image.png

    注:startTread方法不一定是在注册的时候调用的,因为启动该线程的条件是添加任务的时候还未启动,刚好这里注册的时候第一次加入任务,所以在这里启动,如果其他版本在这之前还加入了任务,那么到注册的时候就是直接加入任务,而不需要启动线程

    其他初始化

    从initAndRegister开始看起,因为上篇文章有些细节没有看完

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                // ....
            }
           // ....
            return regFuture;
        }
    

    这个init上篇文章没有分析到,这个实现在ServerBootstrap中

        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    代码虽然有点长,但是逻辑不复杂,主要分几点:

    1. 在Channel中设置属性
    2. 添加一个Handler到ChannelPipeline中

    ChannelPipeline如何添加Handler以及ChannelPipeline的原理,其他文章会进行详细分析,暂时把其当成一个链表就OK,这里就是把一个Handler加入到链表中,此时链表只有这个Handler(事实有一个Head和Tail,忽略)。
    这个Handler的实现中又会将其他Handler加入到ChannelPipeline中,那么这个ChannelInitializer类型的Handler是几时调用的呢?再往后看
    init方法后面就是register的代码了,这里上篇文章说到他会调用到AbstractUnsafe的register方法,这里也有些细节没有看到,那么进入register方法看下(register又调用了核心逻辑的registe0r方法,直接看register0方法)

            private void register0(ChannelPromise promise) {
                try {
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    

    invokeHandlerAddedIfNeeded这个方法还没分析过,先看下这个方法是不是我们要找的

        final void invokeHandlerAddedIfNeeded() {
            assert channel.eventLoop().inEventLoop();
            if (firstRegistration) {
                firstRegistration = false;
                // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
                // that were added before the registration was done.
                callHandlerAddedForAllHandlers();
            }
        }
    

    如果是第一次注册,那么会调用callHandlerAddedForAllHandlers方法,那么就是说,其实这个方法只会调用一次。再看下callHandlerAddedForAllHandlers上面的注释,大概的意思是

    在将Channel注册到Selector上的时候,应该调用在注册前添加的Handler

    额,刚好注册前添加的Handler就是上面说的ChannelInitializer,那么进去看看是如何调用的

        private void callHandlerAddedForAllHandlers() {
            final PendingHandlerCallback pendingHandlerCallbackHead;
            synchronized (this) {
                assert !registered;
    
                // This Channel itself was registered.
                registered = true;
    
                pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
                // Null out so it can be GC'ed.
                this.pendingHandlerCallbackHead = null;
            }
    
            // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
            // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
            // the EventLoop.
            PendingHandlerCallback task = pendingHandlerCallbackHead;
            while (task != null) {
                task.execute();
                task = task.next;
            }
        }
    

    逻辑很简单,这个方法只会在注册完成前调用,调用后就把registered 设置为true了,接着就是调用PendingHandlerCallback的execute方法,这个PendingHandlerCallback是什么呢,先看下Pipeline的addLast中的几句代码

        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            // ....
            synchronized (this) {
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
                //....
            }
            return this;
        }
        private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
            assert !registered;
    
            PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
            PendingHandlerCallback pending = pendingHandlerCallbackHead;
            if (pending == null) {
                pendingHandlerCallbackHead = task;
            } else {
                // Find the tail of the linked-list.
                while (pending.next != null) {
                    pending = pending.next;
                }
                pending.next = task;
            }
        }
    

    在添加的时候,只有在Channel未注册到Selector的时候,才会对PendingHandlerCallback赋值,其也是个链表。
    回到callHandlerAddedForAllHandlers方法,execute方法最终会调用到ChannelInitializer的handlerAdded方法

        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isRegistered()) {
                initChannel(ctx);
            }
        }
        @SuppressWarnings("unchecked")
        private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
                try {
                    initChannel((C) ctx.channel());
                } catch (Throwable cause) {
                } finally {
                    remove(ctx);
                }
                return true;
            }
            return false;
        }
    

    handlerAdded中会调用到initChannel方法,然后会把该Handler从ChannelPipeline中移除。

    其实ChannelInitializer实现了Channel注册后的为每一个Channel添加ChannelHandler的功能,但是其本质也是也是一个ChannelHandler

    相关文章

      网友评论

          本文标题:Netty源码分析----服务启动之开始接收请求及其他细节

          本文链接:https://www.haomeiwen.com/subject/nzctjftx.html