美文网首页
Netty4源码深入学习1-服务启动

Netty4源码深入学习1-服务启动

作者: 未名枯草 | 来源:发表于2018-04-26 17:55 被阅读51次

    1.源码准备

    试用版本为


    引用包版本

    Netty 服务端创建的时序图,如下:


    服务端创建的时序图

    2.服务端启动代码说明:

    举例:服务端启动代码:


    服务端启动代码

    开启一个服务端,端口绑定在8888,使用nio模式,下面讲下每一个步骤的处理细节

    1)EventLoopGroup
     是一个死循环,不停地检测IO事件,处理IO事件,执行任务,后面详细描述;
     初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
    
    2)ServerBootstrap
    初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口是服务端的一个启动辅助类;
    通过给它设置一系列参数来绑定端口启动服务;
    
    3).channel(NioServerSocketChannel.class)
    指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
    表示服务端启动的是nio相关的channel;
    channel在netty里面是一大核心概念,可以理解为一条channel就是一个连接或者一个服务端bind动作;
    
    4)b.childHandler(new NettyServerFilter())
     表示一条新的连接进来之后,该怎么处理,
    

    NettyServerFilter代码如图中所示:


    NettyServerFilter代码
    5)ChannelFuturef =b.bind(port).sync()
    这里就是真正的启动过程了,绑定6789端口,等待服务器启动完毕,才会进入下行代码。
    

    3. 详细描述

    跳入bind()方法:
    bind()方法
    通过端口号创建一个 InetSocketAddress,然后继续step,进入下一步:
    image.png
    其中validate()方法用于验证服务启动需要的必要参数是否合格
    ivalidate()
    跳入上一层校验,校验内容如下,group及channelFactory是否为空;
    image.png
    完成校验以后进行doBind()方法:
    doBind()
    dobind()内部实现,主要有两个核心内容。两大核心一个是 initAndRegister(),以及doBind0();
    a)首先看initAndRegister() 方法:
    initAndRegister() 方法
    核心代码如图中箭头部分;
    1. new 一个channel,
    2. init这个channel,即调用init(channel)初始化通道信息
    3. 将这个channel register到某个对象。
    
    <1> new 一个channel,step进入
    channel = channelFactory().newChannel();此处相比以前版本去掉final并放入try-catch中
    调用channelFactory生成通道channel实例,
    NioServerSocketChannel 作为clazz,是通过serverbootstrap的channel方法来指定通道类型。
    
    进一步查看channleFactory的初始化,

    此时回到NettyServer, Netty实现初始化AbstractBootstrap的位置,此时,在代码中存在

     b.channel(NioServerSocketChannel.class); // 设置nio类型的channel
    

    查找此方法,发现在AbstractBootstrap类中存在channel方法,在方法channel中初始化new一个factory:

    channelFactory初始化
    ReflectiveChannelFactory继承了ChannelFactory工厂方法newChannel
    所以,initAndRegister中的channelFactory.newChannel()方法就是生成了一个NioServerSocketChannel的实例。
    
    image.png
      clazz.newInstance() 是通过反射的方式来创建一个对象,
     而这个clazz就是我们在ServerBootstrap中传入的NioServerSocketChannel.class
    

    进一步step,会初始化一系列变量,最终调用反射的clazz类,即NioServerSocketChannel,进行初始化,


    NioServerSocketChannel初始化
      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
      //DEFAULT_SELECTOR_PROVIDER 为一个selector,
    

    逐渐step,会跳入到NioServerSocketChannel构造方法中,


    NioServerSocketChannel构造方法
    [NIO SelectionKey中定义的4种事件]
      *   SelectionKey.OP_ACCEPT —— 接收连接继续事件,**表示服务器监听到了客户连接,服务器可以接收这个连接了**
    
      *   SelectionKey.OP_CONNECT —— 连接就绪事件,**表示客户与服务器的连接已经建立成功**
    
      *   SelectionKey.OP_READ —— 读**就绪**事件,**表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)**
    
      *   SelectionKey.OP_WRITE —— 写**就绪**事件,**表示已经可以向通道写数据了(通道目前可以用于写操作)**
    
       这里 注意,下面两种,SelectionKey.OP_READ ,SelectionKey.OP_WRITE ,
    
      1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端有向缓存中write数据,下次轮询时,则会 isReadable()=true;
    
      2.当向通道中注册SelectionKey.OP_WRITE事件后,这时你会发现当前轮询线程中isWritable()一直为ture,如果不设置为其他事件
    

    进一步往构造方法super上层查看,最后跳入:


    image.png
      将前面 provider.openServerSocketChannel(); 创建出来的 ServerSocketChannel保存到成员变量ch
      然后调用ch.configureBlocking(false);设置该channel为非阻塞模式
      这里的 readInterestOp 即前面层层传入的 SelectionKey.OP_ACCEPT,
    

    接下来重点分析 super(parent);(这里的parent其实是null,由前面写死传入);
    在AbstractNioChannel中做了下面几件事:

      1、继续调用父类AbstractChannel(Channel parent)构造方法;
      此构造方法中,主要做了三件事:
      1)、给channel生成一个新的id
      2)、通过newUnsafe初始化channel的unsafe属性
      3)、pipeline =new DefaultChannelPipeline(this) 初始化channel的pipeline属性
    
    image.png
      2)在AbstractChannel类中,newUnsafe()是一个抽象方法
    
    image.png

    最终实现来自于AbstractNioMessageChannel类中有newUnsafe()的实现


    image.png
           此方法返回一个NioMessageUnsafe实例对象,
           而NioMessageUnsafe是AbstractNioMessageChannel的内部类
           NioMessageUnsafe 只覆盖了 父类AbstractNioUnsafe中的read方法,如下图
           通过NioMessageUnsafe 及其父类的代码便可以知道,
           其实unsafe对象是真正的负责底层channel的连接/读/写等操作的,
           unsafe就好比一个底层channel操作的代理对象
    
    NioMessageUnsafe
      OP_ACCEPT都已经注册上了,当接收到新用户连接时就会触发unsafe.read()方法。
     read()会不断调用doReadMessages(),将产生的readBuf逐一发送给Pipeline.fireChannelRead()去处理。
    
    
        private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    unsafe内容后续学习;

          3)  pipeline =new DefaultChannelPipeline(this);
          # step之后发现,最后实现如下图代码所示:
          # 初始化了HeadContext及TailContext对象。
          # head及tail初始化完成后,它们会相互连接。
          # 通过上面的代码可以得出,pipeline就是一个双向链表。
    
    DefaultChannelPipeline构造函数
    <1>部分总结:
    用户调用方法 Bootstrap.bind(port) 第一步就是通过反射的方式new一个NioServerSocketChannel对象,
    并且在new的过程中创建了一系列的核心组件,进一步研究:
    
    1、NioServerSocketChannel对象内部绑定了Java NIO创建的ServerSocketChannel对象;
    
    2、Netty中,每个channel都有一个unsafe对象,此对象封装了Java NIO底层channel的操作细节;
    
    3、Netty中,每个channel都有一个pipeline对象,此对象就是一个双向链表;
    
    NioServerSocketChannel的类继承结构图:
    NioServerSocketChannel的类继承结构图
    <2> init这个channel
    image.png
    上面代码前几行主要进行配置,
    重要内容为最后一行
    获取当前通道的pipeline,然后为 NioServerSocketChanne l绑定的 pipeline 添加 Handler;
    此pipeline即为上面<2>中生成的pipeline
    

    相关文章

      网友评论

          本文标题:Netty4源码深入学习1-服务启动

          本文链接:https://www.haomeiwen.com/subject/mmhmlftx.html