美文网首页netty
Netty中的水平触发和边缘触发

Netty中的水平触发和边缘触发

作者: 云海_54d4 | 来源:发表于2018-02-25 14:30 被阅读703次

    在非阻塞IO中,通过Selector选出准备好的fd进行操作。有两种模式,一是水平触发(LT),二是边缘触发(ET)。

    在LT模式下,只要某个fd还有数据没读完,那么下次轮询还会被选出。而在ET模式下,只有fd状态发生改变后,该fd才会被再次选出。ET模式的特殊性,使在ET模式下的一次轮询必须处理完本次轮询出的fd的所有数据,否则该fd将不会在下次轮询中被选出。

    在Netty中,NioChannel体系是水平触发,EpollChannel体系是边缘触发。

    从源码中,NioServerSocketChannel对一次轮询出的SelectionKey.OP_ACCEPT的处理来看:

    AbstractNioMessageChannel#read()

           public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    

    1.使用allocHandle维护一个大小合适的缓冲区
    2.循环调用doReadMessage(readBuf),该方法内部是调用java nio中的serverSocketChannel.accept() 获得SocketChannel,并包装成netty中的NioSocketChannel,然后放入readBuf。
    3.pipeline.fireChannelRead(readBuf.get(i)),让每一个NioSocketChannel经过NioServerSocketChannel的handler链进行处理。
    4.finally{...}中,调用removeReadOp(),移除该Channel的SelectionKey中的OP_ACCEPT值

    水平触发主要体现在4,Netty为了使每次轮询负载均衡,不至于一次轮询要的readBuf内存过大,所以限制了readBuf大小,导致每次轮询所能够处理的数据有限,这就可能使一次轮询不回读完fd中的数据。在finally{...}中移除了OP_ACCEPT是因为他工作在LT触发模式下,即使移除了,只要fd中还有数据,下次轮询仍然会把该fd选出进行处理。

    而AbstractEpollServerChannel相同工作的代码如下:

    @Override
            void epollInReady() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                if (shouldBreakEpollInReady(config)) {
                    clearEpollIn0();
                    return;
                }
                final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
                allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
    
                final ChannelPipeline pipeline = pipeline();
                allocHandle.reset(config);
                allocHandle.attemptedBytesRead(1);
                epollInBefore();
    
                Throwable exception = null;
                try {
                    try {
                        do {
                            // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
                            // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
                            // enabled.
                            allocHandle.lastBytesRead(socket.accept(acceptedAddress));
                            if (allocHandle.lastBytesRead() == -1) {
                                // this means everything was handled for now
                                break;
                            }
                            allocHandle.incMessagesRead(1);
    
                            readPending = false;
                            pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1,
                                                                     acceptedAddress[0]));
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        pipeline.fireExceptionCaught(exception);
                    }
                } finally {
                    epollInFinally(config);
                }
            }
        }
    

    epoll同样采用了allocHandle来使每次轮询负载均衡,不同的是finally{...}移除SeletionKey中该事件的处理。
    epollInFinally(config):

            final void epollInFinally(ChannelConfig config) {
                maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    clearEpollIn();
                } else if (readPending && maybeMoreDataToRead) {
                    // trigger a read again as there may be something left to read and because of epoll ET we
                    // will not get notified again until we read everything from the socket
                    //
                    // It is possible the last fireChannelRead call could cause the user to call read() again, or if
                    // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
                    // to false before every read operation to prevent re-entry into epollInReady() we will not read from
                    // the underlying OS again unless the user happens to call read again.
                    executeEpollInReadyRunnable(config);
                }
            }
    

    注释中作出了说明:
    如果fd中仍有未读完的数据,必须调用executeEpollReadRunnable(config),自己触发EPOLLIN event,否则会因为没有读完数据导致socket不可用。

    总结:
    Netty为了使每次轮询负载均衡,限制了每次从fd中读取数据的最大值,造成一次读事件处理并不会读完fd中的所有数据。在NioServerSocketChannel中,由于其工作在LT模式下,所以不需要做特殊处理,在处理完一个事件后直接从SelectionKey中移除该事件即可,如果有未读完的数据,下次轮询仍会获得该事件。而在EpollServerSocketChannel中,由于其工作在ET模式下,如果一次事件处理不把数据读完,需要手动地触发一次事件作为补偿,否则下次轮询将不会有触发的事件。

    相关文章

      网友评论

        本文标题:Netty中的水平触发和边缘触发

        本文链接:https://www.haomeiwen.com/subject/zhdrxftx.html