美文网首页
netty源码分析(四) - 新连接接入

netty源码分析(四) - 新连接接入

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-10 17:25 被阅读0次

概述

新连接接入.png

新连接接入流程大致分为4步:

  1. 检测新连接:通过服务端channel绑定的selector轮询出accept事件
  2. 创建NioSocketChannel:基于jdk的socketChannel创建netty的NioSocketChannel
  3. 分配线程及注册selector:给新创建的NioSocketChannel分配NioEventLoop,以后该channel的所有事件都通过该loop来管理,并注册到对应的selector上
  4. 向selector注册读事件:注册过程跟服务端启动时注册accept事件复用同一段逻辑

1. 检测新连接

注意:服务端channel和客户端channel都有NioEventLoop,很多代码是公用的,通过策略模式执行到不通channel的逻辑,看代码时要通过channel的不同进行区分(channel的结构可以参考《netty源码分析(一) - 基本组件》)

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //unsafe:NioMessageUnsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    try {
        int readyOps = k.readyOps();            
        // read 和 accept事件都通过unsafe.read();进行处理;ServerSocketChannel的read是指读取一条连接;SocketChannel是指读取IO数据
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
  • unsafe:当前是服务端channel的NioEventLoop,unsafe是NioMessageUnsafe
  • unsafe.read():read 和 accept事件都通过unsafe.read()进行处理;服务端Channel的read是指读取一条连接;客户端是指读取IO数据

NioMessageUnsafe:read

public void read() {
    //服务端channel config NioServerSocketChannel启动时创建
    final ChannelConfig config = config();
    //服务端channel pipeline NioServerSocketChannel启动时创建
    final ChannelPipeline pipeline = pipeline();
    //allocHandle控制连接读取的速率,默认是16
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //循环读取连接并放到readBuf中(List)
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //遍历readBuf,通过pipeline传播到ServerBootstrapAcceptor中执行
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        //服务端channel通过pipeline传播channelReadComplete事件
        pipeline.fireChannelReadComplete();
                if (exception != null) {
                    closed = closeOnReadError(exception);
                    //读取连接报错时传播fireExceptionCaught事件
                    pipeline.fireExceptionCaught(exception);
                }
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
  1. 循环读取连接并放到readBuf中(List)
  2. 遍历readBuf,pipeline传播fireChannelRead事件到ServerBootstrapAcceptor中执行
  3. 通过pipeline传播channelReadComplete事件
  4. 读取连接报错时传播fireExceptionCaught事件

NioServerSocketChannel:doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
    //1. 调用jdk底层accept方法获取jdk channel即SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            //2. 创建客户端channel(NioSocketChannel),把jdk channel包装在里面
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        //忽略非重点代码
    }
    return 0;
}
  1. 调用jdk底层accept方法获取jdk channel即SocketChannel
  2. 创建客户端channel(NioSocketChannel),把jdk channel包装在里面
  3. 创建的NioSocketChannel放到buf中返回

2. 创建NioSocketChannel

this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
config = new NioSocketChannelConfig(this, socket.socket());
  • 创建id/unsafe/pipeline,
  • ch:jdk SocketChannel
  • readInterestOp:感兴趣的事件-read事件
  • NioSocketChannelConfig:会默认关闭Nagle算法(setTcpNoDelay(true)),小的数据包尽可能发出去,降低延迟

3. 分配线程及注册selector

前面init流程中分析过服务端channel初始化时会添加一个handle即ServerBootstrapAcceptor,其中重写了channelRead(详细分析参考《netty源码分析(二) - 服务端启动 - 2》,下面只分析下ServerBootstrapAcceptor创建过程)

pipeline.addLast(new ServerBootstrapAcceptor(
     ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

//workGroup工作线程组
private final EventLoopGroup childGroup;
//childHandler:启动类中配置(EchoServer)
private final ChannelHandler childHandler;
//childOptions:启动类中配置
private final Entry<ChannelOption<?>, Object>[] childOptions;
//childAttrs:启动类中配置
private final Entry<AttributeKey<?>, Object>[] childAttrs;
  • childGroup:workGroup工作线程组
  • childHandler:启动类中配置(EchoServer)
  • childOptions:启动类中配置
  • childAttrs:启动类中配置
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    //把childHandler加到客户端channel(NioSocketChannel) pipeline上
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        //NioSocketChannel注册到childGroup(实际就是NioEventLoop数组)的selectors上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
  1. 把childHandler加到客户端channel(NioSocketChannel) pipeline上
  2. 配置childOptions/childAttrs到NioSocketChannel上
  3. 从childGroup中选择NioEventLoop并注册selector;register过程跟NioServerSocketChannel启动时注册selector时流程一致

只分析下最终执行的register0方法

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        //调用jdk底层进行注册感兴趣的事件,但是此时注册的实际是0,不是read事件
        doRegister();
        neverRegistered = false;
        //执行pipeline上handlerAdded方法
        pipeline.invokeHandlerAddedIfNeeded();
        //cas设置注册成功状态
        safeSetSuccess(promise);
        //执行pipeline上channelRegistered方法
        pipeline.fireChannelRegistered();
        //判断当前channel是否以及open
        if (isActive()) {
            //firstRegistration 默认为true,执一次(注册完成)之后变为false
            if (firstRegistration) {
                //执行pipeline上channelActive方法,在HeadContext中执行到readIfIsAutoRead()真正注册read事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
  1. 调用jdk底层进行注册感兴趣的事件,但是此时注册的实际是0,不是read事件
  2. 执行pipeline上handlerAdded方法
  3. 执行pipeline上channelRegistered方法
  4. 首次执行时执行pipeline上channelActive方法,在HeadContext中执行到readIfIsAutoRead()真正注册read事件
protected void doRegister() throws Exception {
    for (;;) {
        try {
            //注册 0 事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            //忽略非重点代码
        }
    }
}
  • register三个参数:分别为当前NioEventLoop中的selector、感兴趣的事件 0、attachment为当前NioSocketChannel

4. 向selector注册读事件

注册读事件流程在HeadContext的readIfIsAutoRead()方法中,执行逻辑跟《netty源码分析(二) - 服务端启动 - 2》服务端绑定端口后的注册逻辑一致;唯一区别是当前是客户端channel,注册的是read事件

至此新连接接入流程分析完毕

相关文章

网友评论

      本文标题:netty源码分析(四) - 新连接接入

      本文链接:https://www.haomeiwen.com/subject/xiedpktx.html