美文网首页
【Netty】Netty的启动过程二

【Netty】Netty的启动过程二

作者: 小圣996 | 来源:发表于2020-01-11 16:45 被阅读0次

    在上篇文章《Netty的启动过程一》中,我们讲述了Netty服务端boss线程的启动过程,但是worker线程是如何启动的还是未知的。我们知道了boss线程是在ServerBootstrap的bind方法中启动的,再回到上篇文章中Netty的启动代码段,在NioEventLoopGroup的初始化方法和ServerBootstrap的bind方法中间还隔了很多代码,这些源码都还没看的,我们现在来看看这些源码。

    继NioEventLoopGroup初始化后,服务端便创建了一个ServerBootstrap实例,这个类是服务端Netty特有的启动类,客户端的为Bootstrap;接下来便把boss线程组和worker线程组分别赋给了ServerBootstrap的group和childGroup变量,注意worker线程组是赋给了childGroup;接下来便是设置一些参数,比如channel,option,childOption,handler,childHandler,注意带child的和没带child的区别:带child的基本是设置 ServerChannel 的子 channel 的选项,即没带child的基本都是对boss线程而言的,而带child的基本都是对worker线程而言的。
    这里需要注意channel(NioServerSocketChannel.class)一句,它是指设置boss线程channel类型。

    接下来要了解下Netty的ChannelPipeline和ChannelHandler的关系了,这里引用《游戏之网络进阶》的一幅图:

    数据在ChannelPipeline中流程.png

    pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler,即负责消息入站和出站的流程。

    在上篇文章中,我们知道了在启动boss线程后,虽然boss线程在for循环中无限循环,但是是没有进入到后面的if分支的SelectionKey.OP_ACCEPT中的,只有先进了这里,才会启动服务端的worker线程:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        return;
                    }
                }
                ...
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
    }
    

    因此,我们再把断点打在threadFactory.newThread(command).start()中,然后启动客户端去连接服务端,看下它的调用堆栈是怎样的:

    客户端连接启动worker线程.png

    从上篇可知,当每次有客户端连接时,此时readyOps=16,继而启动worker线程;每次读取客户端数据时,此时readyOps=1,继而worker线程读取数据;很明显,Netty是以readyOps的值区分连接和读写数据的,那么readyOps又是如何设置的呢?看代码,readyOps取自于SelectionKey,而SelectionKey取自于SelectionKey[]数组,而SelectionKey[]

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    来自selectedKeys.flip(),flip()实现如下:

        SelectionKey[] flip() {
            if (isA) {
                isA = false;
                keysA[keysASize] = null;
                keysBSize = 0;
                return keysA;
            } else {
                isA = true;
                keysB[keysBSize] = null;
                keysASize = 0;
                return keysB;
            }
        }
    

    即SelectionKey[]来自keysA或keysB地址,而上述processSelectedKeys方法处于NioEventLoop的无限循环中,即boss线程(worker线程)的无限循环中:

        @Override
        protected void run() {
            for (;;) {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            ...
                    }
                    processSelectedKeys();
            }
        }
    

    也就是说,boss线程在无限循环SelectionKey[]即keysA或keysB的值,当读到SelectionKey不为空时,也就读到了readyOps值,根据readyOps值,就知道客户端是什么操作了,证据如下:

        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;
                }
                selectedKeys[i] = null;
                final Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                }
            }
        }
    

    现在知道了worker线程启动和读写数据跟这个readyOps值有关,那这个值又是如何设置进去的呢?我们已知SelectionKey[]来自于keysA或keysB,那么我们全局搜索这两个变量看怎么用的,就知道它是如何设置值的了。

    keysA全局引用.png

    可见,keysA或keysB唯一设置值的地方是在add方法中,因此我们在add方法中打上断点,启动客户端去连接,就应该知道SelectionKey[]值是如何设置的了。

    SelectionKey[]设置SelectionKey的readyOps为16.png

    果然,当客户端请求连接服务端时,在boss线程中,进入了此断点,而且SelectionKey的readyOps设置成了16,后续在processSelectedKey方法中,boss线程就是根据此readyOps值再启动worker线程的。而且由调用堆栈可知,它正是在boss无限循环的run()方法中进入了select(wakenUp.getAndSet(false))方法,查询是否有就绪的IO事件(读写,连接等),有即设置keysA或keysB的SelectionKey值。而这些SelectionKey值是Netty监听到了这些IO事件,封装进SelectionKey的。根据操作系统的不同而封装过程不同。

    Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
    当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
    摘自:《新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

    同理,当客户端发数据给服务端时,也进入了此断点,而且SelectionKey的readyOps设置成了1,只是此时是在worker线程中了。

    SelectionKey[]设置SelectionKey的readyOps为1.png

    现在知道了worker线程启动的原因,但是过程是怎样的呢?

    我们仍在threadFactory.newThread(command).start()处打上断点,由上篇可知,第一次进入此断点,Netty启动了boss线程,第二次进入此断点即启动了worker线程,现在我们来看下第二次进入此断点的情况(请查看上图->客户端连接启动worker线程.png):

    由图片堆栈打印所知,在boss线程中,首先由readyOps=16,进入了NioMessageUnsafe.read()方法,如下:

            @Override
            public void read() {
                ...
                do {
                    //读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
                    int localRead = doReadMessages(readBuf);
                    ...
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
                    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
            }
    

    这里有两个重要方法,一为doReadMessages(readBuf),主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用:

        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
            return 0;
        }
    

    再看boss线程中NioSocketChannel继承关系:

        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
    

    NioSocketChannel继承自AbstractNioByteChannel,注意在这里先定义了SelectionKey.OP_READ操作,以供worker线程监听此事件:

        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);//为后续worker线程监听SelectionKey.OP_READ事件
        }
    

    另一为pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
                child.pipeline().addLast(childHandler);
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                }
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            }
    

    由此,在childGroup.register(child)中,注册了此channel(NioSocketChannel),并设置了pipeline,参数等其他信息。

    boss线程中的childGroup.png

    此后,在后续的register方法中,由eventLoop.execute方法,启动了worker线程,也是由MultithreadEventLoopGroup中的register方法,以next()限制了worker线程数量。

        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                AbstractChannel.this.eventLoop = eventLoop;//将channel和eventLoop关联起来,即将channel和worker线程关联起来
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                }
            }
    

    并在register0方法中,将netty的niochannel绑定到java原生的selectkey参数上,并告知worker线程pipeline各handler channel的注册和激活事件。

            private void register0(ChannelPromise promise) {
                try {
                    boolean firstRegistration = neverRegistered;
                    doRegister();//将netty的niochannel绑定到java原生的selectkey参数上
                    neverRegistered = false;
                    registered = true;
    
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();//告知pipeline中各handler有channel注册
    
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();//告知pipeline中各handler有channel激活
                        } else if (config().isAutoRead()) {
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                }
            }
    

    看doRegister()方法,在AbstractNioChannel下内部抽象类AbstractNioUnsafe的doRegister()方法中:

        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    //如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                }
            }
        }
    

    将netty的channel绑定到java原生的selectkey参数上,如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了(见processSelectedKeysOptimized方法k.attachment())。
    现在,worker线程如何启动的也知道了,那么worker线程是如何读取数据的呢?

    这次,我们把断点打在if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)一句,然后启动客户端连接服务端并给服务端发数据,这时堆栈为:

    服务端读取客户端数据时

    把该图与上面“客户端连接启动worker线程.png”对比,启动worker线程前,readyOps=16,此时是在boss线程中,实际用的unsafe是NioMessageUnsafe.read();读取客户端数据时,readyOps=1,此时是在worker线程中,实际用的是NioByteUnsafe.read()。此后,经历worker线程的pipeline,将数据发至用户自定义的handler,这便完成了对客户端数据的读取。

    那NioMessageUnsafe是如何来的呢?
    其实NioMessageUnsafe来自ServerBootstrap的bind方法,跟下去,在AbstractBootstrap的initAndRegister()方法中,调用channelFactory.newChannel()方法用反射实例化了boss线程的NioServerSocketChannel。

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                ...
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            ...
            return regFuture;
        }
    

    证据如下,在初始化ServerBootstrap时,有这样一句bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class),它是指设置boss线程channel类型。

        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    

    在上面设置了ServerBootstrap的channelFactory,反射类为NioServerSocketChannel,再以newChannel()方法实例化了NioServerSocketChannel,最终会来到这里:

        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    在这里设置了boss线程将监听SelectionKey.OP_ACCEPT事件,再看它的super方法,NioServerSocketChannel继承自AbstractNioMessageChannel,而AbstractNioMessageChannel也继承自AbstractNioChannel,AbstractNioChannel又继承自AbstractChannel,最终也会来到这里:

          protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个AbstractNioMessageChannel类,因此此处实际调用的是AbstractNioMessageChannel的newUnsafe() 方法,该方法中new了一个内部类NioMessageUnsafe实例,该内部类继承了AbstractNioUnsafe。NioMessageUnsafe即来自于此。

        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioMessageUnsafe();
        }
    
        private final class NioMessageUnsafe extends AbstractNioUnsafe {
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                ...
                do {
                    //读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
                    int localRead = doReadMessages(readBuf);
                    ...
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
                    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
            }
        }
    

    NioByteUnsafe又是如何来的呢?
    其实NioByteUnsafe来自于NioMessageUnsafe.read()方法,该方法中有两个重要方法之一doReadMessages(readBuf),作用主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用,我们在前面将它跟踪至了AbstractNioByteChannel,继续跟下去会发现AbstractNioByteChannel又继承自AbstractNioChannel:

        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            ch.configureBlocking(false);
        }
    

    AbstractNioChannel继承自AbstractChannel:

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个NioSocketChannel类,因此此处实际调用的是NioSocketChannel的newUnsafe() 方法,该方法中new了一个内部类NioSocketChannelUnsafe实例,该内部类继承了NioByteUnsafe。NioByteUnsafe即来自于此。

        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioSocketChannelUnsafe();
        }
    
        private final class NioSocketChannelUnsafe extends NioByteUnsafe {
        }
    

    这样,worker线程的启动过程也讲完了。

    包括上篇文章《Netty的启动过程一》,大致讲解了Netty服务端是如何启动boss线程和worker线程的,如何读取数据的,但也仅是主要的枝干代码,细节之处还有很多没讲全,还有很多重要组件,它们的功能及实现都没讲的。这两篇文章的主要目的,是以一个Netty新手的角度讲解如何看Netty源码,那就是大胆去猜,去验证,去查资料,去看别人思路,还有就是多打断点去调试,不要想着一次全搞懂,而是多看多查多验证去弥补以前没看到的,没看懂的,并不断纠正以前错误认识的,所谓Netty之大,一锅炖不下,其余的只能在后续文章慢慢讲解了,这里先弄懂个大概即可。

    相关文章

      网友评论

          本文标题:【Netty】Netty的启动过程二

          本文链接:https://www.haomeiwen.com/subject/fnksactx.html