美文网首页
netty源码read

netty源码read

作者: sxt_5cc3 | 来源:发表于2018-08-21 16:25 被阅读0次

    1. boss event loop

         private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    
    • 关心SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
      private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            // 校验有效性
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                // 关闭连接
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                // SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
                // 连接或者读取
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    2. NioMessageUnsafe

    • 获得客户端通信SocketChannel, 使用NioSocketChannel包装
    • 执行handler 链channelRead方法
    • 执行handler 链 channelReadComplete方法

    handler 链: HeadContext、ServerBootstrapAcceptor、TailContext

    public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                // 读取策略类
                // HandlerImpml extends MaxMessageHandle
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                // 重置读取信息
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            // NioServerSocketChannel 实现
                            // 获得客户端通信SocketChannel, 使用NioSocketChannel包装
                            // 添加到readBuf中(ArrayList<NioSocketChannel>())
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
                            // message count + 1
                            allocHandle.incMessagesRead(localRead);
                        }
                        // 默认false
                        while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        // 执行handler 链channelRead方法
                        // NioSocketChannel = readBuf.get(i)
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    // 执行handler 链 channelReadComplete方法
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    2.1 HeadContext.channelRead

    继续下一个handler

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
        }
    

    2.2 ServerBootstrapAcceptor.channelRead

    初始化
    不继续执行TailContext.channelRead

     public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
                // 添加ChannelInitializer handler
                // 在NioSocketChannel register后添加HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler
                // NettyConnectManageHandler、NettyServerHandler
    //            .childHandler(new ChannelInitializer<SocketChannel>() {
    //                              @Override
    //                              public void initChannel(SocketChannel ch)  {
    //                                  ch.pipeline()
    //                                          .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
    //                                                  new HandshakeHandler(TlsSystemConfig.tlsMode))
    //                                          .addLast(defaultEventExecutorGroup,
    //                                                  new NettyEncoder(),
    //                                                  new NettyDecoder(),
    //                                                  new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
    //                                                  new NettyConnectManageHandler(),
    //                                                  new NettyServerHandler()
    //                                          );
    //                              }
    //                          }
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    
    • child为NioSocketChannel,后续的操作都是在此类中进行

    3. AbstractUnsafe.register

    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    // promise 是否被取消
                    // java channel是否是open状态
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    // 注册标记
                    boolean firstRegistration = neverRegistered;
                    // javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    // 执行DefaultChannelPipeline中
                    // PendingHandlerCallback包装的ChannelHander的handlerAdded方法
                    // 如果ChannelHander extends ChannelInitializer执行handlerAdded
                    // 后会从DefaultChannelPipeline移除此ChannelHander
                    pipeline.invokeHandlerAddedIfNeeded();
                    // 设置成功, notify 注册的listener
                    safeSetSuccess(promise);
    
                    // 从head开始执行DefaultChannelPipeline的ChannelHandler链
                    // 此示例: 执行完handlerAdded后方法后, 仅剩下默认的HeadContext -> TailContext
                    // 添加ServerBootstrapAcceptor还在任务队列没有执行
                    // 这两个ChannlHandler, 没有任何操作
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    // javaChannel.socket().isBound()
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    • 执行register
    • 执行NettyRemotingServer ChannelInitializer, 添加了HandshakeHandler、NettyEncoder、NettyDecoder、
      IdleStateHandler、NettyConnectManageHandler、NettyServerHandler
    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)  {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                    new HandshakeHandler(TlsSystemConfig.tlsMode))
                                .addLast(defaultEventExecutorGroup,
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    new NettyConnectManageHandler(),
                                    new NettyServerHandler()
                                );
                        }
                    });
    
    image.png
    • 因为channel已经register, 所以NettyRemotingServer ChannelInitializer添加的handler执行handlerAdd方法

    只有IdleStateHandler在handlerAdded方法中执行了代码

    image.png
    • channelRegistered也只是IdleStateHandler有执行代码,其他都是next handler继续执行
    • channelActive
      HeadContext
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();
                readIfIsAutoRead();
            }
    

    NettyConnectManageHandler

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
                super.channelActive(ctx);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
                }
            }
    

    IdleStateHandler单独介绍
    其他handler都是ctx.fireChannelUnregistered();

    3.2 readIfIsAutoRead()

    private void readIfIsAutoRead() {
                // 默认为true
                if (channel.config().isAutoRead()) {
                    channel.read();
                }
            }
    

    channel.read()

    public Channel read() {
            pipeline.read();
            return this;
        }
    

    pipeline.read()

    public final ChannelPipeline read() {
            tail.read();
            return this;
        }
    

    tail.read(); 查找handler链, ChannelOutboundHandler类型的handler执行read方法

    HeadContext.read

    public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    

    NioSocketChannel注册读事件, 注册到了worker NioEventLoop的Selector

    image.png

    4. read

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
    

    4.1 NioByteUnsafe.read

    public final void read() {
                // NioSocketChannelConfig
                final ChannelConfig config = config();
                // 校验SocketChannel有效性
                // (javaChannel().socket().isInputShutdown() || !isActive())
                // 并且
                // (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config))
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                // ByteBuf分配策略(默认PooledByteBufAllocator)
                final ByteBufAllocator allocator = config.getAllocator();
                // ByteBuf分配辅助类(HandleImpl extends MaxMessageHandle)
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                // 重置初始化信息(统计信息、最大读取信息)
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        // 从SocketChannel中读取数据
                        // 并增加totalBytesRead大小
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        // 判断此次读取到的数据大小
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            // 没有读取到数据, 释放ByteBuf
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
                        // 读取记录+1
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
    • ByteBuf默认


      image.png
    • doReadBytes(byteBuf) 后


      image.png

    4.1.1 DefaultChannelPipeline.fireChannelRead

    • HeadContext没有任何操作直接下一个
    • HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf>
      判断第一个字节数据类型, 从handler链中删除自己, fire next
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                // 判断msg是不是泛型执行的类型(ByteBuf)
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    // 执行实现类
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    
                // mark the current position so that we can peek the first byte to determine if the content is starting with
                // TLS handshake
                msg.markReaderIndex();
    
                byte b = msg.getByte(0);
    
                if (b == HANDSHAKE_MAGIC_CODE) {
                    switch (tlsMode) {
                        case DISABLED:
                            ctx.close();
                            log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
                            break;
                        case PERMISSIVE:
                        case ENFORCING:
                            if (null != sslContext) {
                                ctx.pipeline()
                                    .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
                                    .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                                log.info("Handlers prepended to channel pipeline to establish SSL connection");
                            } else {
                                ctx.close();
                                log.error("Trying to establish a SSL connection but sslContext is null");
                            }
                            break;
    
                        default:
                            log.warn("Unknown TLS mode");
                            break;
                    }
                } else if (tlsMode == TlsMode.ENFORCING) {
                    ctx.close();
                    log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
                }
    
                // reset the reader index so that handshake negotiation may proceed as normal.
                msg.resetReaderIndex();
    
                try {
                    // Remove this handler
                    ctx.pipeline().remove(this);
                } catch (NoSuchElementException e) {
                    log.error("Error while removing HandshakeHandler", e);
                }
    
                // Hand over this message to the next .
                ctx.fireChannelRead(msg.retain());
            }
        }
    
    
    • NettyDecoder extends LengthFieldBasedFrameDecoder 就是解码了
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame = null;
            try {
                frame = (ByteBuf) super.decode(ctx, in);
                if (null == frame) {
                    return null;
                }
    
                ByteBuffer byteBuffer = frame.nioBuffer();
    
                return RemotingCommand.decode(byteBuffer);
            } catch (Exception e) {
                log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
                RemotingUtil.closeChannel(ctx.channel());
            } finally {
                if (null != frame) {
                    frame.release();
                }
            }
    
            return null;
        }
    
    • NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand>, 业务处理

    相关文章

      网友评论

          本文标题:netty源码read

          本文链接:https://www.haomeiwen.com/subject/bnjjpftx.html