美文网首页
netty系列之(四)——新连接接入

netty系列之(四)——新连接接入

作者: 康康不遛猫 | 来源:发表于2018-12-12 21:12 被阅读0次

    Netty新连接接入处理逻辑:

    • 检测新连接:新连接通过服务端Channel绑定的Selector轮询OP_ACCEPT事件
    • 创建NioSocketChannel:基于JDK的Nio Channel创建NioSocketChannel即客户端Channel
    • 分配线程及注册Selector:提供客户端Channel分配NioEventLoop,并且将Channel注册到NioEventLoop对应的Selector,Channel读写由NioEventLoop管理
    • 向Selector注册读事件


      NioEventLoop

    一、检测新连接:

    • processSelectedKey(key,channel)[入口]
    • NioMessageUnsafe.read():获取服务端Channel的Config和Pipeline,allocHandle处理接入数据读取Channel,调用continueReading()判断总连接数是否超过每次最大读的连接数
    • doReadMessages()[while循环]:调用javaChannel()获取Channel,使用accept()获取SocketChannel封装成NioSocketChannel放到连接readBuf
    • javaChannel().accept()
    #NioEventLoop的processSelectedKey方法
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();//确保在本eventLoop中
                final ChannelConfig config = config();//服务端Channel的config
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);//循环读取,readBuf临时存储接入的NioSocketChannel
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);//计数,totalMessages 
                        } while (allocHandle.continueReading());//是否继续读取
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));//传递读事件
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();//读结束
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);//传递异常
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
    
    
    #MaxMessageHandle的continueReading
    @Override
    public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
    }
    
    @Override
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                return config.isAutoRead() &&                     //默认true
                       maybeMoreDataSupplier.get() &&
                       totalMessages < maxMessagePerRead &&  //maxMessagePerRead 默认16,即默认一次性读取16个链接
                       totalBytesRead > 0;
    }
    

    NioServerSocketChannel的doReadMessages方法

    protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());//java.nio.channels.SocketChannel
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));//this指服务端Channel,ch指jdk中客户端Channel
                    return 1;//读到一条Channel
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
            return 0;
        }
    

    二、创建NioSocketChannel(客户端Channel):

    • new NioSocketChannel(parent, ch)[入口]
    • AbstractNioByteChannel(p,ch, op_read)
      (1)configureBlocking(false)&save op:调用父类构造函数设置Channel为非阻塞模式保存读事件
      (2)create id(channel唯一标识),unsafe(底层数据读写),pipeline(业务数据逻辑载体)
    • new NioSocketChannelConfig()
      (1)setTcpNoDelay(true)禁止Nagle算法即小数据包尽量发送降低延迟
    protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());//java.nio.channels.SocketChannel
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));//this指服务端Channel,ch指jdk中客户端Channel
                    return 1;//读到一条Channel
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
            return 0;
    }
    

    逐层调用父类构建函数

    public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
    }
    
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);//读事件
    }
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;//jdk中Channel
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);// 非阻塞模式
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
    }
    
    //各个组件的创建id、unsafe、pipeline
    protected AbstractChannel(Channel parent) {
            this.parent = parent;//服务端Channel
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
    }
    
    private final class NioSocketChannelConfig  extends DefaultSocketChannelConfig {
            private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
                super(channel, javaSocket);
            }
    
            @Override
            protected void autoReadCleared() {
                clearReadPending();
            }
        }
    
    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
            super(channel);
            if (javaSocket == null) {
                throw new NullPointerException("javaSocket");
            }
            this.javaSocket = javaSocket;
    
            // Enable TCP_NODELAY by default if possible.
            if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
    //private static final boolean CAN_ENABLE_TCP_NODELAY_BY_DEFAULT = !isAndroid();
                try {
    //默认服务端linux设置setTcpNoDelay(true),禁止Nagle算法,即小数据包尽量发送降低延迟,而不是将小数据包积攒成大数据包
                    setTcpNoDelay(true);
                } catch (Exception e) {
                    // Ignore.
                }
            }
        }
    
    #JDK的socket类中
    /**
         * Enable/disable {@link SocketOptions#TCP_NODELAY TCP_NODELAY}
         * (disable/enable Nagle's algorithm).
         *
         * @param on {@code true} to enable TCP_NODELAY,
         * {@code false} to disable.
         *
         * @exception SocketException if there is an error
         * in the underlying protocol, such as a TCP error.
         *
         * @since   JDK1.1
         *
         * @see #getTcpNoDelay()
         */
        public void setTcpNoDelay(boolean on) throws SocketException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
        }
    

    三、Netty中Channel的分类

    Netty中的Channel的分类:

    • NioServerSocketChannel[服务端Channel]:继承AbstractNioMessageChannel,注册事件为OP_ACCEPT事件,创建NioServerSocketChannelConfig
    • NioSocketChannel[客户端Channel]:继承AbstractNioByteChannel,注册事件为OP_READ事件,创建NioSocketChannelConfig
    • Unsafe[实现Channel读写抽象]:服务端NioMessageUnsafe读连接,客户端NioByteChannel读数据
    channel层级关系.png
    ChannelConfig.png

    AbstractNioChannel

    /**
     * Abstract base class for {@link Channel} implementations which use a Selector based approach.
     */
    public abstract class AbstractNioChannel extends AbstractChannel {
    
        private static final InternalLogger logger =
                InternalLoggerFactory.getInstance(AbstractNioChannel.class);
    
        private final SelectableChannel ch;//jdk channel
       //IO事件,NioServerSocketChannel和NioSocketChannel不同,服务端注册accept事件,客户端注册read事件
        protected final int readInterestOp;
        volatile SelectionKey selectionKey;//SelectionKey 
    
        /**
         * Create a new instance
         *
         * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
         * @param ch                the underlying {@link SelectableChannel} on which it operates
         * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
         */
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    }
    

    服务端NioServerSocketChannel继承AbstractNioMessageChannel,通过accept事件读连接

    //NioServerSocketChannel extends AbstractNioMessageChannel
    public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);//doReadMessages
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    
    protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());// accept客户端接入的Channel
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    

    客户端Channel继承AbstractNioByteChannel,读取数据

    public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取数据
                        if (allocHandle.lastBytesRead() <= 0) {//直到读不到数据
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);//传递读事件
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
     }
    

    四、新连接NioEventLoop的分配和selector注册

    NioServerSocketChannel类中,将读取每一个新链接通过fireChannelRead传播到ServerBootstrapAcceptor

    public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        //将注册的每一个新链接通过fireChannelRead传播到ServerBootstrapAcceptor
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    

    服务端启动时,添加一个ServerBootstrapAcceptor的ChannelHander

    ServerBootstrapAcceptor.png
    void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            //添加ServerBootstrapAcceptor
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    ServerBootstrapAcceptor中的channelRead

    • 添加childHandler[将自定义ChannelHandler添加到新连接的Pipeline]
    • 设置options和attrs:设置childOptions和childAttrs
    • 选择NioEventLoop并注册到Selector:调用Chooser的next()方法选择NioEventLoop,通过doRegister()方法注册到Selector
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);//新链接添加hander(通过ChannelInitializer,添加用户定义的hander)
    
                setChannelOptions(child, childOptions, logger);//设置childOptions
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置childAttrs
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {//childGroup(workGroup)注册
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    上面的childGroup.register最终调用AbstractChannel的register方法

    #AbstractChannel中
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;//保存客户端eventLoop
    
                if (eventLoop.inEventLoop()) {//传入eventLoop为客户端eventLoop,而此时发起线程为服务端eventLoop,这里返回false
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    #AbstractNioChannel中
    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                 //eventLoop().unwrappedSelector()即传入eventloop对应的Selector,this即当前NioSocketChannel
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    五、NioSocketChannel读事件的注册

    AbstractChannel中register0执行pipeline.fireChannelActive()

    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();//此处执行
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);//从head开始
            return this;
    }
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }
    
        private void invokeChannelActive() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelActive(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelActive();
            }
        }
    
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();//向后传播
                readIfIsAutoRead();
     }
    
    private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {//自动读,即像selector注册个读事件
                    channel.read();//最终调用NioByteUnsafe的beginRead方法
                }
    }
    
    #AbstractChannelHandlerContext
    public ChannelHandlerContext read() {
            final AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeRead();
            } else {
                Runnable task = next.invokeReadTask;
                if (task == null) {
                    next.invokeReadTask = task = new Runnable() {
                        @Override
                        public void run() {
                            next.invokeRead();
                        }
                    };
                }
                executor.execute(task);
            }
    
            return this;
        }
    
    
    #AbstractChannelHandlerContext
    private void invokeRead() {
            if (invokeHandler()) {
                try {
                    ((ChannelOutboundHandler) handler()).read(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                read();
            }
        }
    
    #HeadContext extends AbstractChannelHandlerContext
    public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
    }
    
    #AbstractUnsafe
    public final void beginRead() {
                assertEventLoop();
    
                if (!isActive()) {
                    return;
                }
    
                try {
                    doBeginRead();
                } catch (final Exception e) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireExceptionCaught(e);
                        }
                    });
                    close(voidPromise());
                }
            }
    
    #AbstractNioChannel
    protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);//绑定读事件
            }
        }
    

    总结
    Netty新连接接入处理逻辑:
    服务端Channel绑定的NioEventLoop即Boss线程轮询OP_ACCEPT事件,调用服务端Channel的accept()方法获取客户端Channel封装成NioSocketChannel,封装创建组件Unsafe用来实现Channel读写和Pipeline负责数据处理业务逻辑链,
    服务端Channel通过连接接入器ServerBootstrapAcceptor给客户端Channel分配NioEventLoop,将客户端Channel绑定到Selector上面,通过传播Channel Active方法将客户端Channel读事件注册到Selector

    问:Netty是在哪里检测有新连接接入的?
    答:Boss线程通过服务端Channel绑定的Selector轮询OP_ACCEPT事件,通过JDK底层Channel的accept()方法获取JDK底层SocketChannel创建新连接
    问:新连接是怎样注册到NioEventLoop线程的?
    答:Worker线程调用Chooser的next()方法选择获取NioEventLoop绑定到客户端Channel,使用doRegister()方法将新连接注册到NioEventLoop的Selector上面

    相关文章

      网友评论

          本文标题:netty系列之(四)——新连接接入

          本文链接:https://www.haomeiwen.com/subject/ezgybftx.html