美文网首页
Netty笔记-接受请求

Netty笔记-接受请求

作者: 兴浩 | 来源:发表于2018-07-16 23:16 被阅读48次

    Netty笔记-Channel的Register

    这一篇介绍了如何Channel如何接受SelectionKey.OP_ACCEPT的流程,接下就是接受SelectionKey.OP_READ的流程

    1.NioEventLoop的processSelectedKey

    当readyOps为SelectionKey.OP_ACCEPT则进入AbstractNioMessageChannel中的unsafe的read流程

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
             int readyOps = k.readyOps();
             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
        }
    

    2. 回顾ServerSocketChannel的accept方法

    我们先回顾下nio中ServerSocketChannel的accept方法,在接受到SelectionKey.OP_ACCEPT时,调用该方法获取新连接的SocketChannel,并将SelectionKey.OP_READ注册到selector当中去,Netty中封装势必有这样处理的代码

    private void handleInput(final SelectionKey key) throws IOException {
            if (key.isValid()) {
                // 处理新接入的请求消息
                if (key.isAcceptable()) {
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    // Add the new connection to the selector
                    sc.register(selector, SelectionKey.OP_READ);
                }
            }
        }
    

    2. NioMessageUnsafe的read方法

        private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    read方法最需要关注的readBuf,readBuf是NioSocketChannel的数组,其由doReadMessages处理获取,doReadMessages中调用了SocketUtils.accept方法,其内部调用了ServerSocketChannel的accept方法,NioSocketChannel对ServerSocketChannel进行了包装

    @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
       //SocketUtils
        public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
            try {
                return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                    @Override
                    public SocketChannel run() throws IOException {
                        return serverSocketChannel.accept();
                    }
                });
            } catch (PrivilegedActionException e) {
                throw (IOException) e.getCause();
            }
        }
    

    3.NioSocketChannel

    NioSocketChannel调用了父类AbstractNioByteChannel的构造函数,其监听的事件是SelectionKey.OP_READ

        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }
    

    4.ServerBootstrapAcceptor中register

    ServerBootstrapAcceptor是在ServerBootstrap初始化时作为一个ChannelInboundHandler添加到Pineline中,其用于处理接收的Channel

    在channelRead方法中调用了EventLoopGroup的register方法,将Channel注册到work线程中(监听SelectionKey.OP_READ事件)

     @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    参考:
    Netty 接受请求过程源码分析 (基于4.1.23)
    Netty源码分析-Server端启动

    相关文章

      网友评论

          本文标题:Netty笔记-接受请求

          本文链接:https://www.haomeiwen.com/subject/pzeipftx.html