美文网首页
Netty源码分析----NioEventLoop之处理请求

Netty源码分析----NioEventLoop之处理请求

作者: _六道木 | 来源:发表于2018-06-27 00:18 被阅读0次

    (*文章基于Netty4.1.22版本)
    之前已经讲了NioEventLoop的初始化以及核心的队列了,最后还涉及到的就是非常核心的一部分,就是NioEventLoop如何去处理请求。

    请求入口----run

    其核心逻辑是run方法,记得之前说过,NioEventLoop的线程初始化的时候,会调用一下run方法开始处理请求

        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                        // ....
                        SingleThreadEventExecutor.this.run();
                        // ....
                }
            });
        }
    

    接下来就看下run方法的实现

        @Override
        protected void run() {
            for (;;) {
               // ....
            }
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
        }
    

    上面是run方法的大体结构,可以看到除非Shutdown了,否则会在一个无限循环中不停的处理事情。Nio也是类似的,不停的从selector中获取selectionKey,然后进行处理,而这里,大同小异,不过是Netty封装了很多东西(例如处理队列任务),接下来看下for循环中的逻辑

    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
            continue;
        case SelectStrategy.SELECT:
            select(wakenUp.getAndSet(false));// 获取准备好的selectionKey;类似selector.select方法,这里做了额外的一些操作
    
            if (wakenUp.get()) {// 是否需要唤醒selector
                selector.wakeup();
            }
        default:
    }
    
    cancelledKeys = 0;
    needsToSelectAgain = false;
    // 这里就是io任务占用时间与非io任务占用时间的占比,之前一直提起的就是这个值
    // 其值=100 * io时间 /(io时间-非io时间) 
    final int ioRatio = this.ioRatio;
    if (ioRatio == 100) {// 根据算法,非io时间为0,那么则全部进行io任务
        try {
            processSelectedKeys();
        } finally {// 执行完io任务后才会执行非io任务,且不需要限制时间
            runAllTasks();
        }
    } else {// 如果设置了占比,那么需要
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {// 执行完io任务,根据占比求得非io任务运行的时间限制
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    

    前半部分主要是获取准备就绪的selectionKey,后半部分主要是执行io任务和非io任务,根据ioRatio的值将非io任务区分为无限制时间执行和有限制的执行。

    获取准备就绪的事件----select

    runAllTasks方法在任务队列的文章中分析过,这里便不再分析。在nio的程序中,使用selector.select方法可以获取到准备就绪的selectionKey,netty在这里是调用了select,进行了除selector.select之后的一些处理,代码比较长,截取了主要的部分

        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;// select次数
                long currentTimeNanos = System.nanoTime();
                // delayNanos返回最近一个延时离现在的间隔时间,
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                    //  等于(delayNanos+500000L)/ 1000000L
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {// 如果delayNanos<500000L,那么timeoutMillis<=0,该情况直接调用selectNow返回,因为有任务需要执行
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
                    // 否则调用select方法,阻塞timeoutMillis时间
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;//次数+1
                    //....
                    //满足如下条件则返回
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    
                    long time = System.nanoTime();
                    // 即 当前时间-开始执行的时间>=超时时间
                    // 到达这里证明没有获取到准备就绪的selectionKey
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // 如果到达这个分支,证明在timeoutMillis时间内
                        // 没有获取到事件,但是又没有阻塞,可能触发了NIO selector空转的BUG
                        // (理论上select传入x时间,会阻塞x,即当前时间-开始时间会>=超时时间,出现这种情况证明还没到阻塞x时间就返回了)
                        // 判断select的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,
                        // 如果满足了,代表可能触发了NIO的bug,那么调用rebuildSelector重建selector
                        rebuildSelector();
                        selector = this.selector;
    
                        // 再用新的selector获取selectionKey.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
                    currentTimeNanos = time;
                }
                //....
                }
            } catch (CancelledKeyException e) {
            //....
            }
        }
    

    主要的流程看一下注释就好了,和NIO程序差不多,Netty在这之上,根据其他条件,控制了select阻塞时间,或者说控制是否阻塞,另外在触发了NIO的BUG的情况下,如何解决,这个主要是rebuildSelector

        private void rebuildSelector0() {
            final Selector oldSelector = selector;
            final SelectorTuple newSelectorTuple;
            //....
            try {
                newSelectorTuple = openSelector();//创建一个新的SelectorTuple
            } catch (Exception e) {
                return;
            }
    
            // 将对应的Channel注册到新的Selector上.
            int nChannels = 0;
            for (SelectionKey key: oldSelector.keys()) {
                Object a = key.attachment();
                try {
                    //....
                    int interestOps = key.interestOps();
                    key.cancel();
                    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                        // Update SelectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
                    //....
                }
            }
    
            selector = newSelectorTuple.selector;
            unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
            try {
                // 关闭旧的selector
                oldSelector.close();
            } catch (Throwable t) {
                //....
            }
        }
    

    就绪事件处理----processSelectedKeys

    然后看下一下如何处理SelectionKey

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    两个分支,根据是否设置了selectedKeys 来判断是否要使用Netty优化的策略进行处理selectionKey,两种方式的获取的SelectionKey集合不一样,在优化的策略中,使用的是SelectedSelectionKeySet类型,与原生NIO的不一样,这里先看下selectedKeys的初始化
    在NioEventLoop初始化的时候,会调用openSelector创建一个Selector,这里判断了是否要使用优化的策略(只保留核心代码)

    SelectionKey的优化

        private static final boolean DISABLE_KEYSET_OPTIMIZATION =
                SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
                    // ....
                unwrappedSelector = provider.openSelector();
                   // ....
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            // ....
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                            // ....
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                                // ....
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                            // ....
                }
            });
    
            selectedKeys = selectedKeySet;
            // ....
        }
    

    首先创建一个selector,然后会判断DISABLE_KEYSET_OPTIMIZATION这个变量,这里意思是是否禁用SelectionKey的优化,那么如果false,则会先创建SelectedSelectionKeySet类型的变量,后面,使用反射,将这个类型的变量替换调原生Selector中的SelectionKey集合,其原生类型是一个Set,而SelectedSelectionKeySet也是Set的子类,所以这样转换没有问题,到最后,再其引用赋值给NioEventLoop的成员变量selectedKeys,那么此时selectedKeys与Selector中的Key集合都是指向同一个实例,也就是说,当Selector进行select操作获取到SelectionKey之后,selectedKeys就会有值。

    Selector在进行select操作之后,会调用Set的add方法将selectionKey放入其中,Netty实现了自己的Set的原因在于原生是使用HashSet,其底层是HashMap,遍历的效率较低,而SelectedSelectionKeySet底层是一个数组,遍历相对较快,而我们在得到selectionKey集合的时候需要遍历集合进行处理

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
      //....
    }
    

    可以看到内部是一个SelectionKey类型的数组

    处理获取到的SelectionKey----processSelectedKeysOptimized

    接下来看下processSelectedKeysOptimized的处理逻辑

        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {// 遍历集合
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    在注册的时候,att参数类型是ServerSocketChannel,所以这里是AbstractNioChannel类型,调用processSelectedKey处理

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            // 一个Channel会对应一个Unsafe对象,负责底层的操作
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //....
    
            //....
          unsafe.read();
    //....
        }
    

    底层也是通过unsafe来处理请求

            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                          //readBuf为List,最多放了一个元素,为NioSocketChannel对象
                            int localRead = doReadMessages(readBuf);
                           //....
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {// 触发pipeline的read事件的传播
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();//触发read complete事件传播
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
                        // 如果有异常的话,触发异常事件传播
                        pipeline.fireExceptionCaught(exception);
                    }
                } finally {
                    //....
                }
            }
        }
    

    doReadMessages方法如下:

        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel()); 
            //....
           if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
                //...       
            return 0;
        }
    

    将原生的socketchannel转换成NioSocketChannel,处理类似NIO程序的代码

      ServerSocketChannel channel = (ServerSocketChannel) key.channel();
      SocketChannel socketChannel = channel.accept();
    

    将READ事件注册到WorkEventLoop

    另外看下,触发的两个事件,一个是read事件,一个是readComplete事件,先看下read事件的传播。

    回顾一下服务启动和pipeline文章相关的分析,这个时候pipeline只有3个Hander(Context),如下:
    HeadContext -> ServerBootstrapAcceptor -> TailContext
    HeadContext没有对read事件进行处理,直接传播到下一个Handler,即ServerBootstrapAcceptor,看下其channelRead方法:

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
                // 为当前Channel的pipeline添加自定义Handler
                child.pipeline().addLast(childHandler);
                // 设置属性
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    // 将Channel注册到workergroup的EventLoop上
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    ServerBootstrapAcceptor做的事情很简单:

    1. 为pipeline添加程序中自定义的childHandler(ChannelInitializer)
    2. 设置一下Channel属性
    3. 将Channel注册到workerGroup上

    ServerBootstrapAcceptor主要是衔接boss线程组和worker线程组。

    ServerBootstrapAcceptor的衔接如图:


    image.png

    稍微解释一下:
    Server启动的时候,BossEventLoopGroup使用next方法得到一个EventLoop,当第一次调用execute方法时候,会启动一个线程,执行EventLoop的run方法,其中使用Selector获取Channel中的ACCEPT事件,并得到SocketChannel,将其通过pipeline的处理,这是pipeline有个叫ServerBootstrapAcceptor的节点,内部使用了WorkEventLoopGroup的next方法得到一个EventLoop,第一次执行execute方法的时候会启动一个线程执行EventLoop的run方法,通过Selector获取READ事件并对事件做业务处理

    另外要记住的第一点,是我们一开始调用childHandler加入的Handler(ChannelInitializer)

    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline()
                                .addLast(new StringDecoder())
                                .addLast(new ServerHandler())
                                ;
                        }
                    });
    

    在服务启动文章中分析过

    其实ChannelInitializer实现了Channel注册后的为每一个Channel添加ChannelHandler的功能,但是其本质也是也是一个ChannelHandler

    在注册前,会调用注册前添加的Handler,然后会调用到Handler的handlerAdded方法,其中会调用上面的initChannel,这样就给当前Channel添加了自定义的Handler。
    流程是和之前分析的一样的,细节不同点如下:

    1. 这时候的Channel是NioSocketChannel,服务启动的时候是NioServerSocketChanel
    2. 服务启动的时候,通过ChannelInitializer来为pipeline添加ServerBootstrapAcceptor这个Handler,而此时通过ChannelInitializer为pipeline添加业务上自定义的Handler
    3. 两者添加的Handler功能不一样,一个是衔接boss和worker线程组,是系统内置功能;一个是处理业务请求,编码者自定义

    worker线程的channel注册过程是一样的,最后也会执行NioEventLoop的run方法,处理SelectionKey,同样执行到unsafe的read方法,但是触发pipeline的read事件的时候,pipeline中的Handler不一样,此时的pipeline是业务上自定义的Handler,如下

        private static class ServerHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg)
                    throws Exception {
                String body = (String)msg;
                System.out.println("receive body:"+body + ",currentHandler:"+this);
                ctx.write(msg);
                ctx.flush();
            }
        }
    

    接下来看下readComplete事件,会依次触发pipeline上的Handler的channelReadComplete方法,上面说过此时pipeline的Handler如下
    HeadContext -> ServerBootstrapAcceptor -> TailContext
    只有HeadContext有逻辑,另外都是直接往后传播或者空方法,看下HeadContext的逻辑

            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelReadComplete();// 往后传播,后面的Handler没处理,直接无视
                readIfIsAutoRead();
            }
    
            private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {// autoRead是否打开
                    channel.read();// 这个最后会调用unsafe的beginRead方法
                }
            }
    

    之前讲过,Channel初始化的时候会设置关心的事件,但是没有注册上去,在unsafe的beginRead方法里才是注册事件的地方,不过在注册的时候isActive为true,触发了channelActive方法,提前注册了事件

            private void register0(ChannelPromise promise) {
                try {
                    // ....
                    // 间接触发了unsafe的begin的beginRead方法
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    //....
                }
            }
    

    之前说过NioServerSocketChannel和NioSocketChannel的unsafe对象不一样,NioServerSocketChannel对应的是NioMessageUnsafe,NioSocketChannel对应的是NioSocketChannelUnsafe,所以worker线程组的NioEventLoop最后处理的是NioSocketChannelUnsafe的read方法(实际是其父类的read方法)

            @Override
            public final void read() {
                //....
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);// 分配池化内存
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));//从Channel中读取数据到buffer
                        //....
                        pipeline.fireChannelRead(byteBuf);// 触发事件传播
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
                    //....
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);//异常处理
                } finally {
                    // ....
                }
            }
        }
    

    逻辑和NioMessageUnsafe的read方法类似,触发的事件一样,不同的是pipeline中handler不同,这里的fireChannelRead会触发我们自定义的channelRead方法,且传递的对象不一样,这个时候是实际的业务数据。

    到这里,整个处理请求的过程就完毕了,稍微总结一下:

    1. Netty在selector.select之上,判断了任务,唤醒状态位等一些条件,以此决定select是否该阻塞和阻塞的时间
    2. 在触发了NIO的BUG的时候,重建selector,主要是将旧的selector对应的channel注册到新的selector上,取消旧的selector和Key
    3. NioEventLoop会处理io任务和非io任务,其时间占比由ioRatio决定
    4. Netty通过反射将NIO原生的SelectionKey集合替换成自定义的集合,主要是数组实现
    5. boss线程组主要负责连接请求,然后派发给worker线程组进行处理
    6. ServerBootstrapAcceptor主要负责boss和worker的衔接,即将channel注册到woker中的NioEventLoop上
    7. worker线程组负责处理数据的读取,通过pipeline将数据传递到自定义的Handler上
    8. boss和worker线程组的NioEventLoop在注册前都会通过一个叫ChannelInitializer的Handler,为Channel添加pipeline
    9. boss线程组的ChannelInitializer负责添加ServerBootstrapAcceptor这个Handler到pipeline上,而worker线程组的ChannelInitializer负责添加自定义的Handler到pipeline上

    相关文章

      网友评论

          本文标题:Netty源码分析----NioEventLoop之处理请求

          本文链接:https://www.haomeiwen.com/subject/enineftx.html