Netty源码分析之accept过程

作者: 美团Java | 来源:发表于2016-08-09 17:28 被阅读7558次

    本章节分析服务端如何accept客户端的connect请求。

    Netty源码分析之NioEventLoop章节中,已经分析了NioEventLoop的工作机制,当有客户端connect请求,selector可以返回其对应的SelectionKey,方法processSelectedKeys进行后续的处理。

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    默认采用优化过的SelectedSelectionKeySet保存有事件发生的selectedKey。
    1、SelectedSelectionKeySet内部使用两个大小为1024的SelectionKey数组keysA和keysB保存selectedKey。
    2、把SelectedSelectionKeySet实例映射到selector的原生selectedKeys和publicSelectedKeys。

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;
    
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }
    
                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
    

    因为selector的I/O多路复用机制,一次可以返回多个selectedKey,所以要用for循环处理全部selectionKey。

    假设这时有请求进来,selectedKeys中就存在一个selectionKey,这块逻辑不清楚的可以回头看看深入浅出Nio Socket
    1、通过k.attachment()可以获取ServerSocketChannel注册时绑定上去的附件,其实这个附件就是ServerSocketChannel自身。
    2、如果selectedKey的附件是AbstractNioChannel类型的,执行processSelectedKey(k, (AbstractNioChannel) a)方法进行下一步操作。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    1、获取ServerSocketChannel的unsafe对象。
    2、当前selectionKey发生的事件是SelectionKey.OP_ACCEPT,执行unsafe的read方法。

    该read方法定义在NioMessageUnsafe类中:

    private final List<Object> readBuf = new ArrayList<Object>();
    
    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }
    
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        final ChannelPipeline pipeline = pipeline();
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
    
                    // stop reading and remove op
                    if (!config.isAutoRead()) {
                        break;
                    }
    
                    if (readBuf.size() >= maxMessagesPerRead) {
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }
            setReadPending(false);
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));
            }
    
            readBuf.clear();
            pipeline.fireChannelReadComplete();
    
            if (exception != null) {
                if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                    // ServerChannel should not be closed even on IOException because it can often continue
                    // accepting incoming connections. (e.g. too many open files)
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }
    
                pipeline.fireExceptionCaught(exception);
            }
    
            if (closed) {
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
    

    1、readBuf 用来保存客户端NioSocketChannel,默认一次不超过16个。
    2、方法doReadMessages进行处理ServerSocketChannel的accept操作。

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
    

    1、javaChannel()返回NioServerSocketChannel对应的ServerSocketChannel。
    2、ServerSocketChannel.accept返回客户端的socketChannel 。
    3、把 NioServerSocketChannel 和 socketChannel 封装成 NioSocketChannel,并缓存到readBuf。
    4、遍历redBuf中的NioSocketChannel,触发各自pipeline的ChannelRead事件,从pipeline的head开始遍历,最终执行ServerBootstrapAcceptor的channelRead方法。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    1、child.pipeline().addLast(childHandler)添加childHandler到NioSocketChannel的pipeline。
    其中childHandler是通过ServerBootstrap的childHandler方法进行配置的,和NioServerSocketChannel类似,NioSocketChannel在注册到selector后会触发其pipeline的fireChannelRegistered方法,并执行initChannel方法,为NioSocketChannel的pipeline添加更多自定义的handler,进行业务处理。
    2、childGroup.register(child)将NioSocketChannel注册到work的eventLoop中,这个过程和NioServerSocketChannel注册到boss的eventLoop的过程一样,最终由work线程对应的selector进行read事件的监听。

    当readBuf中缓存的NioSocketChannel都处理完成后,清空readBuf,并触发ChannelReadComplete。

    到此为止,一次accept流程已经执行完。

    END。
    我是占小狼。
    在魔都艰苦奋斗,白天是上班族,晚上是知识服务工作者。
    如果读完觉得有收获的话,记得关注和点赞哦。
    非要打赏的话,我也是不会拒绝的。

    相关文章

      网友评论

      • 橡树人:谢谢狼哥这么深入的分享,这种分析思路对我启发很大,请教个问题:一个boss线程理论上可以接受的连接数有多大?有哪些影响因素呢?我想到的是系统内存;JVM参数配置;一个连接占用的资源;具体的计算还是有点模糊。
        美团Java:@橡树人 这种都是需要压测的,一般10w的连接数没问题
      • hongrm:ServerBootstrapAcceptor 一直都不知道这个类怎么来的,原来在serverbootstrap 初始化时就加入进去了,一次accept过程终于清晰多了
        美团Java:@hongrm :+1:
      • 齐德隆咚强:写的不错!
      • 简书闪电侠:果然是越深入的文章,留言的人越少啊!
        4386ebe46b38:写的很吊!
        美团Java:@the_flash 嗯,受众不广

      本文标题:Netty源码分析之accept过程

      本文链接:https://www.haomeiwen.com/subject/jlprsttx.html