美文网首页
6. Netty解析:服务端如何接受连接并后续处理读写事件

6. Netty解析:服务端如何接受连接并后续处理读写事件

作者: 饿了就下楼 | 来源:发表于2020-02-12 11:12 被阅读0次

    前言

      之前的文章分析到了服务端NioServerSocketChannel的创建注册及注册accept事件。到现在为止,关于服务端,我们还有多个疑问未解开:例如当有客户端连接过来时,服务端要怎么处理,以及后续的读写如何进行的?之前的分析都是在服务端的父事件循环组中,那么子事件循环组又是怎么起作用的?之前提到了子处理器childHandler被封装在了ServerBootStrapAcceptor中添加到了流水线,那么子处理器又是怎么工作的,有没有被添加到pipeline中?本文将对这些问题进行解答。

    服务端接收到连接后做了什么

      我们不得不再次回到下面这个方法,当有read或者accept事件到来时,执行unsaf.read()(对于连接事件和read事件,因为其所对应的通道一个是NioServerSocketChannel一个是NioSocketChannel,两者的unsafe是不同的,对于连接事件,unsafe.read位于AbstractNioUnsafe中)。

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                 
                    return;
                }
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
               if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
        @Override
        public void read() {
            /*省略代码*/
            try {
                try {
                    for (;;) {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        // stop reading and remove op
                        if (!config.isAutoRead()) {
                            break;
                        }
    
                        if (readBuf.size() >= maxMessagesPerRead) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    exception = t;
                }
                setReadPending(false);
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                pipeline.fireChannelReadComplete();
                /*省略代码*/
            } finally {
                /*省略代码*/
            }
        }
    
    
        /*进行连接的接收*/
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    

      在doReadMessages方法中,会通过NioServerSocketChannel内部的ServerSocketChannel来完成连接的接受,如果此时有连接进来,那么会生成SocketChannel实例,netty将其封装为自身的NioSocketChannel实例加入到缓冲buf中。随后会将缓冲区中内容(新接收的连接)通过fireChannelRead进行流水线传输。可以发现,NioServerSocketChannel中的ServerSocketChannel在接收到连接后会通过NioServerSocketChannel中的流水线的channelRead方法进行传输。
      随后流水线channelRead执行到ServerBootstrapAcceptor。

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            setChannelOptions(child, childOptions, logger);
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
    

      由于我们已经知道传入的msg是一个netty已接收连接的通道(实际封装了SocketChannel的NioSocketChannel),获取到它的pipeline,并将childHandler添加到它的流水线中。如果我们给其配置的childHandler是ChannelInitializer的话,那么随后还会将其他的处理器添加到流水线中。同样的,新接收的连接还没有注册,所以也需要将其注册,只不过在server端,新接受的连接注册到childGroup,也就是子事件循环组中,最终它会被注册到子事件循环组中的某个NioEventLoop上,它之后的读写的事件操作就与之前分析客户端时相似。
       在netty中NioServerSocketChannel负责接收连接,它注册在父NioEventLoopGroup中,而且他的通道的handler最后是一个ServerBootStrapAcceptor处理器。在接收到新连接时,出发fireChannelRead回调方法的执行,当执行到ServerBootStrapAcceptor时,ServerBootStrapAcceptor将子处理器加入到新连接的通道的流水线中。那么以Echo Server这个例子来看,两种通道的流水线分别是:

    1 NioServerSocketChannel的流水线(在前文已经见到过)


    2 新连接到来后NioSocketChannel的流水线

      

    *链接

    1. Netty解析:第一个demo——Echo Server
    2. Netty解析:NioEventLoopGroup事件循环组
    3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
    4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
    5. Netty解析:connect/bind方法背后
    6. Netty解析:服务端如何接受连接并后续处理读写事件

    相关文章

      网友评论

          本文标题:6. Netty解析:服务端如何接受连接并后续处理读写事件

          本文链接:https://www.haomeiwen.com/subject/irbsnctx.html