美文网首页
netty源码分析(12)- 新连接接入

netty源码分析(12)- 新连接接入

作者: Jorgezhong | 来源:发表于2019-02-22 12:39 被阅读0次

    在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接)处理IO事件执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。

    今天我们研究的新连接介入的过程大概如下:

    1. 检测新连接
    2. 检测新连接之后,创建NioSocketChannel,也就是客户端 channel
    3. 接着给channel分配一个NioEventLoop,并且把该channel注册到NioEventLoop对应的selector上。至此,这条channel之后的读写都由该NioEventLoop进行管理。
    4. 最后向selector注册读写事件,注册的时候和服务端启动注册accept事件复用同一段逻辑。

    netty的多连接复用指的是,多个连接父用一个NioEventLoop持有的线程。
    netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在bind()绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()开始

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    //真正的处理过程
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    真正的入口NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel),改方法整体逻辑我们前面分析过在《NioEventLoop执行之processSelectedKeys()》章节中有介绍,这里我们直接看,新连接处理的逻辑

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            //省略代码
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    
                //新连接接入以及读事件处理入口
                unsafe.read();
            }
          }
    

    这里的unsafe是在《Channel创建过程》的时候,调用了父类AbstractChannel#AbstractChannel()的构造方法,和pipeline一起初始化的。

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    unsalfNioServerSockeChannel的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel.NioMessageUnsafe,内部类

        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioMessageUnsafe();
        }
    

    查看该类的read()方法,其大致流程如下:

    1. 循环调用jdk底层的代码创建channel,并用netty的NioSocketChannel包装起来,代表新连接成功接入一个通道。
    2. 将所有获取到的channel存储到一个容器当中,检测接入的连接数,默认是一次接16个连接
    3. 遍历容器中的channel,依次调用方法fireChannelReadfireChannelReadCompletefireExceptionCaught来触发对应的传播事件。
        private final class NioMessageUnsafe extends AbstractNioUnsafe {
            //临时存储读到的连接
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
    
                //服务端接入速率处理器
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        //while循环调用doReadMessages()创建新连接对象
                        do {
                            //获取jdk底层的channel,并加入readBuf容器
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
                            //把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环
                            allocHandle.incMessagesRead(localRead);
                            
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
                    
                    //触发readBuf容器内所有的传播事件:ChannelRead 读事件
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    //清空容器
                    readBuf.clear();
                    allocHandle.readComplete();
                    //触发传播事件:ChannelReadComplete,所有的读事件完成
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
                        //触发传播事件:exceptionCaught,触发异常
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
    • 获取jdk底层的channel,调用的是NioServerSocketChannel#doReadMessages(),创建jdk底层channel并且用NioSocketChannel包装起来,将该channel添加到传入的容器保存起来,同时返回一个计数。
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            //获取jdk底层的channel
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    //将jdk底层的channel封装到netty的channel,并存储到传入的容器当中
                    buf.add(new NioSocketChannel(this, ch));
                    //成功和创建 客户端接入的一条通道,并返回
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    

    下面这段代码allocHandle是一个服务器接入速率处理器,其实是DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle,通过incMessagesRead()方法维持一个成员变量totalMessages,与continueReading()方法配合,控制一个while循环接入的连接最大数。循环获取了一个批次的连接之后再统一处理这部分连接。

            //服务端接入速率处理器
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    
            @Override
            public final void incMessagesRead(int amt) {
                totalMessages += amt;
            }
    
            @Override
            public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                        //判断读取到的连接总数是否大于最大连接数,maxMessagePerRead默认16
                       totalMessages < maxMessagePerRead &&
                       totalBytesRead > 0;
            }
    
    

    相关文章

      网友评论

          本文标题:netty源码分析(12)- 新连接接入

          本文链接:https://www.haomeiwen.com/subject/yjesyqtx.html