美文网首页
(十)netty是如何读取数据

(十)netty是如何读取数据

作者: guessguess | 来源:发表于2021-05-21 09:13 被阅读0次

    当selector监听到有数据可读的时候,必然是通过channel去读取数据的。
    所以入口还是channel的read方法。
    由于我们使用的是NioSocketChannel.所以就关注一下NioSocketChannel的read方法。

    最后定位到,read方法的实现,是在NioSocketChannel的父类AbstractNioByteChannel中。
    代码如下

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
            @Override
            public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        .............通过pipeline去传播read事件。
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    ................读取完成后,通过pipeline去传播read完成事件。
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    }
    

    从上面代码可以大概猜出,先传播读的事件,后续传播读完成的事件。
    从代码上可以看出是通过channel的pipeline发起的。因此需要关注Pipeline中对于read事件的处理。
    由于pipeline的类是DefaultChannelPipeline。所以看看DefaultChannelPipeline中的实现。

    public class DefaultChannelPipeline implements ChannelPipeline {
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
    
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    head实现了channelRead方法。
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    }
    

    从上面代码看到,read是headContext发起的。最后通过headContext中实现的channelRead方法去读取数据。所以接下来,我们看看head是如何实现channelRead方法的。

    public class DefaultChannelPipeline implements ChannelPipeline {
        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx的类型是AbstractChannelHandlerContext,父类中的实现都是往下传递。所以接下来往下看
                ctx.fireChannelRead(msg);
            }
        }
    }
    

    接下来看AbstractChannelHandlerContext中是如何进行传播的。
    从以下代码可以看出。
    1.先找到下一个Inbound=true的ctx
    2.执行ctx所实现的channelRead方法。
    3.ctx的channelRead方法如果是继续传播,则会1,2,3步骤一直递归。当然也可以考虑中断传播,不fire即可。

    abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
            return this;
        }
         
        找到下一个inbound属性为true的ctx
        private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while ((ctx.executionMask & mask) == 0);
            return ctx;
        }
    
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
         
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    执行channelRead方法,一般都是进行传播,不进行相关操作。
                    当然也可以有其他操作,比如像反序列化,反序列化之后再进行传播,这点跟序列化其实是一样的。具体可以看看ByteToMessageDecoder的channelRead方法。
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    }
    

    总结一下流程图

    netty读数据的流程

    相关文章

      网友评论

          本文标题:(十)netty是如何读取数据

          本文链接:https://www.haomeiwen.com/subject/pdrbjltx.html