美文网首页
Netty源码分析3:新连接接入

Netty源码分析3:新连接接入

作者: LucasHao | 来源:发表于2020-03-31 16:28 被阅读0次

    本文参考自慕课网《Java读源码之netty》

    如何检测新连接? boss线程轮循出accept事件,通过底层的accept()接收连接

    如何注册到NioEventLoop线程? 通过serverSocketChannel的Adptor中,调用workGroup的chooser.next()来确定EventLoop,从而调用其register()绑定selector,并设置对读事件敏感

    四步骤:

    • 检测新连接的accept事件
    • 创建netty封装的channel
    • 分配到eventloop线程并注册到selector
    • 向selector注册读事件

    1.新连接检测

    • step1: 在NioEventLoop中run()的时候调用了processSelectedKey()方法;其中对于SelectionKey这个参数,如果检测到key的事件类型为read || accept事件的时候调用unsafe.read()函数;

    NioEventLoop.java

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            NioUnsafe unsafe = ch.unsafe();
            //...
            if ((readyOps & 17) != 0 || readyOps == 0) {
                unsafe.read();
                //...
            }
        }
    
    • step2: unsafe.read()是父类AbstractNioMessageChannel中的read()函数,其调用NioServerSocketChannel中的doReadMessages()来接收channel;

    AbstractNioMessageChannel.java

        public void read() {
                //...
                try {
                    int localRead;
                    try {
                        do {
                            localRead = AbstractNioMessageChannel.
                                    this.doReadMessages(this.readBuf);
                            //...当发现读完了,那么localRead就是一个非正数,就break;跳出
                            allocHandle.incMessagesRead(localRead);//连接计数
                        } while(allocHandle.continueReading());//一直读到没有限定条件
                        //.... 
                    }
                }
        }
    
    //可以看到这里限定的是netty单次处理read()事件允许接受的一些最大门限
        public boolean continueReading() {
                return this.config.isAutoRead()
                && this.totalMessages < this.maxMessagePerRead   //这个值默认16
                && this.totalBytesRead < 2147483647;
            }
    
    • step3: doReadMessages()的方法内,首先把jdk原生的channel拿到,然后封装为netty自己的channel;

    NioServerSocketChannel.java

        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = this.javaChannel().accept();  //接收到ch
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));//并放入上面MessageChannel中的List中
                    return 1;
                }
                //否则没有新连接了,返回非正数
            }
            //...
        }
    

    2.NioChannel的创建

    通过上述的new NioSocketChannel(this, ch)创建一个netty封装的channel(可以看到服务端是通过工厂模式反射,而这用的New instance)

    new NioSocketChannel(this, ch)
      //this就是当前的正在处理新连接的NioServeSocketChannel
      //ch就是new出来的jdk自带channel
    

    可以看到NioSocketChannel 的构造函数里面分为了两个步骤

      public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {
            super(parent, socket);   //调用父类构造函数
            this.config = new NioSocketChannel    //新建绑定的config类
              .NioSocketChannelConfig(this, socket.socket());
        }
    
    • 2.1 调用父类构造函数AbstractNioByteChannel()

        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
              super(parent, ch, 1);   //   " 1 "  就是OP_READ
          }
      
      • 2.1.1 configureBlocking(false)设置非阻塞,save op将感兴趣的事件(OP_READ)保存到成员变量;

        继续调用其父类构造函数AbstractNioChannel

        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);   //可以看到这里把原生的channel设置为非阻塞
            }
            //...
        
      • 2.1.2 创建与之关联的id ,unsafe ,pipline

        基于AbstractNioChannel,继续调用父类AbstractChannel 的构造函数,配置如下

            protected AbstractChannel(Channel parent) {  //parent是服务端channel
                this.parent = parent;
                this.id = this.newId();
                this.unsafe = this.newUnsafe();
                this.pipeline = this.newChannelPipeline();
            }
        
    • 2.2 创建与channel绑定的配置类NioSocketChannelConfig(),注意这里是一个NioSocketChannel内部类。

        private final class NioSocketChannelConfig 
                                    extends DefaultSocketChannelConfig {
              private NioSocketChannelConfig(NioSocketChannel channel,
                                                Socket javaSocket) {
                  super(channel, javaSocket);
              }
              //...
          }
      
      • setTcpNoDelay(true)禁止Nagle算法,将小数据包无延迟发送,保证数据实时性;这里的调用是在父类DefaultSocketChannelConfig 构造函数里面实现

        public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
                super(channel);
                if (javaSocket == null) {
                    throw new NullPointerException("javaSocket");
                } else {
                    this.javaSocket = javaSocket;
                    //这里的值  = !isAndroid(); 因为不是android所以是  !false=true
                    if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
                        try {
                            this.setTcpNoDelay(true);//设置Nodelay=true,调用到jdk底层
                        } catch (Exception var4) {
                        }
                    }
                }
            }
        

    X. 题外话 Netty中的 Channel 分类

    我们自己所使用的channel有

    • 服务端 NioServeSocketChannel 通过工厂 反射实现
    • 客户端 NioSocketChannel 通过连接触发serveSocketChannel的read进行 new产生
    • unsafe 客户端channel构造的时候 产生

    而所有channel的继承关系大致如下


    image.png
    • Channel 是所有类的父类,含有pipline eventloop parent channelId 等,是一个基本骨架的存在
    • AbstractChannel 是Channel的子类,包含 select的监听 ,含有SelectionKey SelectableChannel(jdk底层的key和channel)以及阻塞与否,感兴趣的事件等
    • AbstractNioByteChannel是客户端Channel的父类,依赖于 NioByteUnsafe(将其作为成员变量)
      • NioSocketChannel客户端channel 关心的事件是OP_READ
      • NioByteUnsafe (通过 newUnsafe()来实例化)
    • AbstractNioMessageChannel是服务端channel的父类,依赖于 NioMessageUnsafe(将其作为成员变量)
      • NioServeSocketChannel服务端channel 关心的事件是OP_ACCEPT
      • NioMessageUnsafe(通过 newUnsafe()来实例化)

    对于unsafe(),封装了channel的读写事件,对于服务端来说,read()会调用doReadMessage()中的jdk read方法接收请求;对客户端来说read()会调用doReadBytes()读取放入ByteBuffer中


    3.新连接NioEventLoop的分配和selector的注册

    首先我们回顾服务端channel的创建过程;在服务端channel创建后会通过init()初始化,其最后又几行逻辑:

    ServerBootstrap

        void init(Channel channel) throws Exception {
            //...
            ch.eventLoop().execute(new Runnable() {
                        public void run() {
                            pipeline.addLast(
                                new ChannelHandler[]{new ServerBootstrap
                                .ServerBootstrapAcceptor(
                                    currentChildGroup,
                                    currentChildHandler,
                                    currentChildOptions,
                                    currentChildAttrs)});
                        }
                    });
        }
    

    可以看到服务端channel的Pipline中加入了一个ServerBootstrapAcceptor作为adptor,这个adptor会在接收到新连接的时候其作用;同时传入的还有用户自定义的Options和Attributes以及Handler,这些自定义的东西会在创建每个新连接的channel中给他们赋值,设置。

    对于新连接,首先执行上述两部分的任务:NioEventLoop中拿到Key后会调用processSelectedKey()来处理channel,里面会调用channel的unsafe来执行unsafe.read(),unsafe是NioMessageChannel中的内部类read()函数中通过doReadMessage()通过jdk底层accept获取到连接的channel;

    然后调用pipeline.fireChannelRead()交给自动添加的adptor进行处理:
    AbstractNioMessageChannel

        private final class NioMessageUnsafe extends AbstractNioUnsafe {
            public void read() {
                 //...
                 AbstractNioMessageChannel.this.doReadMessages(this.readBuf);
                 //...
                    for(int i = 0; i < localRead; ++i) {
                        AbstractNioMessageChannel.this.readPending = false;
                        pipeline.fireChannelRead(this.readBuf.get(i));//传给上份代码中的adptor
                    }
            }
        }
    

    ServerBootstrapAcceptor中,主要有三件事:

    • 添加childHandler

      经历层层调用,到了ServeBootstrap的channelRead()

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
              final Channel child = (Channel)msg;
              //这里将 用户在初始化Bootstrap的时候,手动通过Initializer添加的handler
              //放入这个channel的pipline中
              //initializer初始化添加之后将自己remove释放
              child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
              //...
          }
      
    • 设置options 和attributes

      同样在ServeBootstrap 的channelRead()中 ,通过拿到channel的config类对channel进新option的设置,这些option也来自用户代码自定义,attribute同理;这些option和attibute都是通过在创建之前在pipline中添加的ServerBootstrapAcceptor初始化的时候传入的

                try {
                      if (!child.config()
                                .setOption((ChannelOption)e    //设置option
                                .getKey(), e.getValue())) {
                              ServerBootstrap.logger.warn("Unknown channel option: " + e);
                      }
                  }   
                //...
                for(i$ = 0; i$ < len$; ++i$) {
                      e = arr$[i$];                              //获取自定义属性
                      child.attr((AttributeKey)e.getKey()).set(e.getValue());//绑定属性
                  }
      
    • 为新连接选择NioEventLoop并注册selector

      同样通过channelRead(),调用 this.childGroup.register(child)完成这个逻辑,这里的childGroup就是有很多线程(EventLoop)的负责客户端读写的workGroup;workGroup里面通过chooser的.next()来决定是哪个线程/EventLoop来绑定,对应的NioEventLoop通过register注册channel,

      MultithreadEventExecutorGroup(父类)

        //这里的eventloopGroup是workGroup(也就是很多线程响应客户端读写那个)
        public EventExecutor next() {
              return this.chooser.next();
              //chooser在上一篇文章有讲过
              //通过idx++&(length-1)确定是哪个eventLoop
          }
      

      然后层层调用,就到了最终对jdk底层的调用注册上去,注册是时候通过注册函数中的attachment参数的方式将封装的nettyChannel放进去,当事件发生的时候通过获取attachment获取到netty封装好的channel

    4 客户端NioSocketChannel中读事件的注册

    上一步绑定到selector后其实并没有结束,如果不做任何操作是不会产生任何事件的,原因在于绑定的channel并没有设置感兴趣的事件,而selector只有感兴趣的事件发生才会检测到;所以在register至置定的EventLoop之后,会通过设置感兴趣的事件为readInterestOp:
    AbstractNioChannel

        protected void doBeginRead() throws Exception {
            SelectionKey selectionKey = this.selectionKey;
            if (selectionKey.isValid()) {
                this.readPending = true;
                int interestOps = selectionKey.interestOps();
                //一般情况下interstOps=0 表示对任何事件不敏感
                if ((interestOps & this.readInterestOp) == 0) {
                    selectionKey.interestOps(interestOps | this.readInterestOp);
                    //所以这里设置了对读事件敏感
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:Netty源码分析3:新连接接入

          本文链接:https://www.haomeiwen.com/subject/lezwuhtx.html