美文网首页Java 杂谈程序员
netty分析(一) -- 服务启动流程

netty分析(一) -- 服务启动流程

作者: msrpp | 来源:发表于2018-09-11 22:14 被阅读103次

    如果还不了解原生nio的socket编程,可以看前置博文

    一个简单的Demo程序

    先贴一个简单的netty的example中echo服务端代码

    /*
     * Copyright 2012 The Netty Project
     *
     * The Netty Project licenses this file to you under the Apache License,
     * version 2.0 (the "License"); you may not use this file except in compliance
     * with the License. You may obtain a copy of the License at:
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations
     * under the License.
     */
    package io.netty.example.echo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.util.SelfSignedCertificate;
    
    /**
     * Echoes back any received data from a client.
     */
    public final class EchoServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    

    代码很简洁,但是看不懂,因为使用的这些类均和Nio原生编程相差甚远,下面先简单分析一下。

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup);
    
    

    此处首先是新建了一个ServerBootstrap 启动类,分别设置好boss和worker工作线程。

    b.channel(NioServerSocketChannel.class);
    
    

    此处是设置channel的类型,内部会以创建一个ServerBootstrapChannelFactory工厂来保存class,用于后续对象创建。

    b.option(ChannelOption.SO_BACKLOG, 100);
    
    

    此处设置了客户端连接socket属性。

    b.childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
    

    此处设置了客户端连接建立以后对SocketChannel的初始化逻辑。

    以上的代码均是给ServerBootstrap对象的各个参数赋值,真正让netty跑起来的重点在下面代码。

    ChannelFuture f = b.bind(port).sync();
    
    

    阅读这段代码之前,我们留个悬念,我们需要先了解另一个类:NioEventLoop。了解了NioEventLoop,netty中的线程模型就清晰起来了,后续分析将不会太费力。

    NioEventLoop

    NioEventLoop是与jdk层nio交互的最重要的对象,是在NioEventLoopGroup对象中创建出来的。
    NioEventGroup内部有个名为children的数组,我们把它理解成一个头尾相连的环,每次我们调用NioEventLoopGroup.next()方法时,会返回这个环的下一个元素。这个元素就是一个NioEventLoop。
    这个children的大小由什么决定呢?答案就是NioEventLoopGroup对象构造时传入的线程数量。

    接下来我们来看看NioEventLoop的具体实现,构造函数

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
            super(parent, executor, false);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            provider = selectorProvider;
            selector = openSelector();
        }
    
    

    看一下openSelector()的实现。

            private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return selector;
            }
    
            try {
                SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
                Class<?> selectorImplClass =
                        Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
    
                // Ensure the current selector implementation is what we can instrument.
                if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                    return selector;
                }
    
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);
    
                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
    
                selectedKeys = selectedKeySet;
                logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
            } catch (Throwable t) {
                selectedKeys = null;
                logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
            }
    
            return selector;
        }
    

    构造函数调用了provider.openSelector()来产生一个多路复用选择器对象。
    jdk原生Nio实现中,selector内部有一个HashSet对象selectedKeys,用来存储调用select函数之后的结果集。如果未禁用优化,此处还利用反射将selector内部的selectedKeys值设置成本地对象。这么做有一个好处,每次调用Selector的select函数以后,能很方便的查看selectedKeys的值以确认是否产生了发生了新的事件。

    外界可以调用NioEventLoop的execute方法来放入任务,查看其实现。

        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp) {
                wakeup(inEventLoop);
            }
        }
    
            private void startThread() {
            synchronized (stateLock) {
                if (state == ST_NOT_STARTED) {
                    state = ST_STARTED;
                    delayedTaskQueue.add(new ScheduledFutureTask<Void>(
                            this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
                            ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                    doStartThread();
                }
            }
        }
    

    NioEventLoop内部有一个状态变量state,这保证了在调用startThread方法时,只会调用一次doStartThread。而doStartThread,在首次调用的时候,会创建新的线程,查看doStartThread

        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        if (state < ST_SHUTTING_DOWN) {
                            state = ST_SHUTTING_DOWN;
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                synchronized (stateLock) {
                                    state = ST_TERMINATED;
                                }
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    
    

    主角是executor,看下其默认实现。

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    
    

    可以看到,每次调用executor的execute方法将会产生一个新的线程,实际上只调用了一次doStartThread,所以只会创建一个线程。

    新线程最后调用到了"SingleThreadEventExecutor.this.run();"。好了,我们离真相已经很近了。贴一下run的实现。

     protected void run() {
            for (;;) {
                oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select();
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
    
                    final long ioStartTime = System.nanoTime();
                    needsToSelectAgain = false;
                    if (selectedKeys != null) {
                        processSelectedKeysOptimized(selectedKeys.flip());
                    } else {
                        processSelectedKeysPlain(selector.selectedKeys());
                    }
                    final long ioTime = System.nanoTime() - ioStartTime;
    
                    final int ioRatio = this.ioRatio;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }
    

    run方法,是一个死循环,做的事情就是周期执行Selector的select函数获取事件并处理,以及执行一些抛进队列的任务。

      1. select()/selectNow(),查看函数内部,执行了原生Selector的select方法,第一步已经浮出水面了,根据nio的调用流程(详细代码在这篇博文中有),下一步应该就是ServerSocketChannel调用accept函数来接受客户端链接了,让我们找一下。
    • 2.如果select调用之后有事件发生。那么selectedKeys将发生改变(注意selectedKeys变量实际是指向底层Selector的触发事件集合的引用),此时进入processSelectedKeysOptimized函数处理:

        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;
                }
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    // Need to flip the optimized selectedKeys to get the right reference to the array
                    // and reset the index to -1 which will then set to 0 on the for loop
                    // to start over again.
                    //
                    // See https://github.com/netty/netty/issues/1523
                    selectedKeys = this.selectedKeys.flip();
                    i = -1;
                }
            }
        }
    

    进一步看processSelectedKey

        private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException e) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    当客户端触发连接的时候,readyOps应该是16 ,对应着SelectionKey.OP_ACCEPT(如果触发了OP_READ,那么将触发读取客户端数据操作,这个在下篇博文中再详尽分析,地址),进一步查看unsafe.read()中调用的doReadMessages方法。

            public void read() {
                assert eventLoop().inEventLoop();
                if (!config().isAutoRead()) {
                    removeReadOp();
                }
    
                final ChannelConfig config = config();
                final int maxMessagesPerRead = config.getMaxMessagesPerRead();
                final boolean autoRead = config.isAutoRead();
                final ChannelPipeline pipeline = pipeline();
                boolean closed = false;
                Throwable exception = null;
                try {
                    for (;;) {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    exception = t;
                }
    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                pipeline.fireChannelReadComplete();
    
                if (exception != null) {
                    if (exception instanceof IOException) {
                        // ServerChannel should not be closed even on IOException because it can often continue
                        // accepting incoming connections. (e.g. too many open files)
                        closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                    }
    
                    pipeline.fireExceptionCaught(exception);
                }
    
                if (closed) {
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            }
    
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    

    由此我们也找到了accept,藏的还挺深,调用accept之后我们拿到具体对接客户端连接的socket绑定到一个work线程,放入list buf中。接着我们回到上层的read方法。
    一步步调用到了这里。

    设置SocketChannel的pipline属性堆栈
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                child.unsafe().register(child.newPromise());
            }
    

    首先是这句 "child.pipeline().addLast(childHandler);" 很熟悉不是吗,childHandler是开头我们调用ServerBootstrap的childHandler方法传入的处理对象,接下来设置好socket属性

    查看register实现。

            public final void register(final ChannelPromise promise) {
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        promise.setFailure(t);
                    }
                }
            }
    
    

    向eventLoop投递了一个register事件,在eventLoop(NioEventLoop)线程(此时的eventLoop是workerGroup中的线程)中,将会把这个SocketChannel也注册到eventLoop中的selector,注意到这里实现和我们原生的nio调用有区别,每个线程都启用了一个Selector对象来轮询事件

    接下来我们回到开头的demo程序,看看bind做了什么

        public ChannelFuture bind(SocketAddress localAddress) {
            validate();//判断参数合法性
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    

    看doBind

        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise;
            if (regFuture.isDone()) {
                promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }
    
    

    初始化了一个Channel,并将其绑定到boss线程。我们进一步看下initAndRegister

        final ChannelFuture initAndRegister() {
            Channel channel;
            try {
                channel = createChannel();
            } catch (Throwable t) {
                return VoidChannel.INSTANCE.newFailedFuture(t);
            }
    
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                return channel.newFailedFuture(t);
            }
    
            ChannelPromise regFuture = channel.newPromise();
            channel.unsafe().register(regFuture);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
    

    进一步分为三个步骤,createChannel,init和register。

        Channel createChannel() {
           EventLoop eventLoop = group().next();
           return channelFactory().newChannel(eventLoop, childGroup);
       }
    
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
            if (handler() != null) {
                p.addLast(handler());
            }
    
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                            currentChildAttrs));
                }
            });
        }
    

    根据createChannel的实现所示,ServerBootstrap.channel设置进来的Channel类型派上用场了。这里将bossGroup中的NioeventLoop绑定到
    创建出来的channel中,为什么也同时绑了workerGroup呢,因为这个ServerChannel接收到的客户端连接要抛给指定的worker处理呀。

    init函数完成了setoption,及给ServerChannel的pipline绑定了对于的处理ChannelHandler。

    接下来我们着重看下register的实现。

            public final void register(final ChannelPromise promise) {
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        promise.setFailure(t);
                    }
                }
            }
    
    
            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!ensureOpen(promise)) {
                        return;
                    }
                    doRegister();
                    registered = true;
                    promise.setSuccess();
                    pipeline.fireChannelRegistered();
                    if (isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    if (!promise.tryFailure(t)) {
                        logger.warn(
                                "Tried to fail the registration promise, but it is complete already. " +
                                        "Swallowing the cause of the registration failure:", t);
                    }
                }
            }
    

    终于看到了调用了eventLoop.execute方法。这里由于不是Eventloop的内部线程因此会走到execute的逻辑。结合我们之前对NioEventLoop的分析,首次调用会创建一个新的线程来执行投递进去Runnable对象的run方法,最后执行了ServerChannel的注册逻辑。注意到传进去的promise是一个future对象,在注册成功以后,可以由其他线程通过promise看到是否执行完成

    至此,我们总结一下。

    ServerBootstrap设置了两个线程组,bossGroup和workerGroup,每个线程内部均有一个selector循环地执行select函数来查找监听的事件。正常场景下,我们应该只有一个监听端口,此时bossGroup仅有一个线程在工作。
    boss线程的selector只绑定了一个ServerSocketChannel,当其accept到一个客户端连接以后,会调用线程组的next()函数获取一个NioEventLoop来将SocketChannel放入worker中执行逻辑。
    同时NioEventLoop还有一个execute方法,支持了其他线程往内部线程抛入Runnable任务。这个主要场景是boss线程检测到有新连接到来时,将channel注册到worker线程组。以及用户线程函数在调用ServerBootstrap的bind
    时注册serverChannel到boss线程。

    还需要扩展认识的部分

    还是有许多疑惑,数据的拆分包的实现原理是怎样的,ChannelHandler处理数据的流程,添加多个ChannelHandler时如何工作。下回合分析。

    相关文章

      网友评论

        本文标题:netty分析(一) -- 服务启动流程

        本文链接:https://www.haomeiwen.com/subject/itbjgftx.html