美文网首页
Netty源码分析1:启动过程

Netty源码分析1:启动过程

作者: LucasHao | 来源:发表于2020-03-16 21:14 被阅读0次

    1.新建bootstrap(用户调用层)

    //serverbootstrap初始化什么也不做
    public ServerBootstrap() {
        }
    

    2.为Bootstrap新增组件(用户调用层)

    b.group(bossGroup, workerGroup)
           .channel(NioServerSocketChannel.class)
           .childOption(ChannelOption.TCP_NODELAY, true)
           .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
           .handler(new ServerHandler())
           .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     ch.pipeline().addLast(new AuthHandler());  
                 }
            });
    

    3.正式启动(进入源码):

    ChannelFuture f = b.bind(8888).sync();
    
    • 四大步骤启动:
      • 创建服务端channel
      • 初始化服务端channel
      • 注册selector
      • 端口绑定

    对应代码调用:

    • AbstractBootstrap bind()
      • initAndRegister()->channelFactory.newChannel();//制造一个channel
      • initAndRegister()->init(channel);//初始化channel
      • initAndRegister()->register//注册selector
      • dobind()//绑定端口

    3.1创建服务端channel

    通过绑定.class,工厂反射创建channel;

    AbstractBootstrap.java

        public ChannelFuture bind(int inetPort) {
            return this.bind(new InetSocketAddress(inetPort));//step:1
        }
    ...
        public ChannelFuture bind(SocketAddress localAddress) {
            this.validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            } else {
                return this.doBind(localAddress);//step:2
            }
        }
    ...
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = this.initAndRegister();//step:3    //...
        }
    ...
        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = this.channelFactory.newChannel();//3.1内容制造一个channel
                this.init(channel);// 3.2内容:初始化
            } 
        //...
        }
    

    可以看到通过channelFactoryinitAndRegister()制造了一个channel,channelFactory的产生代码

    ReflectiveChannelFactory.java

        private final Class<? extends T> clazz;
    ...
        public T newChannel() {
            try {
                return (Channel)this.clazz.newInstance();//反射机制
            }
            //...
        }
    

    这里显然是在之前已经绑定好了工厂生产的产品,是在用户自己初始化的时候绑定的

    b.group(bossGroup, workerGroup)
           .channel(NioServerSocketChannel.class)
           .....
    

    是在AbstractBootstrap.java 中这个函数创建了上面的工厂,并对channel类型绑定

        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            } else {
                return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
            }
        }
    
    3.1.2 NioServerSocketChannel实例化的过程

    1.反射的东东看完了,来看下channel是怎么new 出来的:

    NioServerSocketChannel.java

        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    //...
        private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openServerSocketChannel();
            } 
            //...
        }
    

    而其中的这个provider是

    NioServerSocketChannel.java

    import java.nio.channels.spi.SelectorProvider;
    class NioServerSocketChannel{
        //...
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER =                                                                            SelectorProvider.provider();
        //...
    }
    

    2.使channel非阻塞

    NioServerSocketChannel.java

        public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
            super((Channel)null, channel, 16);// step1:调用父类AbstractNioMessageChannel
            //...
        }
    

    上述代码调用AbstractNioMessageChannel的构造函数,而后者也是直接调用AbstractNioChannel的构造函数,对于16这个参数:应该是TCP的Accept属性,将这个服务端socket设为接收请求的socket;

    AbstractNioChannel.java

        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            //...
            try {
                ch.configureBlocking(false);//step2:这一部将jdk底层调用产生的channel变成非阻塞
            }
            //...
        }
    

    3.创建channel有关的的id,unsafe,pipline等

    在2的最后一步try非阻塞前,AbstrctNioChannel调用其父类AbstrctChannel

    AbstrctChannel.java

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            this.id = this.newId();  //channel唯一id
            this.unsafe = this.newUnsafe();//tcp读写底层类
            this.pipeline = this.newChannelPipeline();//pipline创建
        }
    

    4.然后回到NioServerSocketChannel,TCP参数配置存放在这个NioServerSocketChannelConfig里面

        public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
            //...
            this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(
                this, this.javaChannel().socket()
            );
        }
    

    3.2初始化服务端channel

    在init函数里面有四个步骤:

    3.2.1 配置自身channel的option
        void init(Channel channel) throws Exception {
            Map<ChannelOption<?>, Object> options = this.options0();
            synchronized(options) {
                channel.config().setOptions(options);// 1
            }
            Map<AttributeKey<?>, Object> attrs = this.attrs0();//2
            //...
        }    
    

    这一块的option默认是空的,而设置的地方就是在下图中最开始bootstrap的那一堆链式调用初始化里面自己指定,然后在上面代码中options()调用获取

    image.png
    同理,也会顺着将创建channel前就设置的attribute正式绑定到channel(如上面代码 //2)
    3.2.2配置客户端产生连接后的childChannel的option

    这一步类似上一步,获取到childChannel的option和attribute,但这里是暂存,暂存原因在3.2.4解释:

        void init(Channel channel) throws Exception {
            //...3.2.1部分...
            synchronized(this.childOptions) {
                    currentChildOptions = (Entry[])this.childOptions
                                        .entrySet()
                                        .toArray(newOptionArray(this.childOptions.size()));
                    }
    
            final Entry[] currentChildAttrs;
            synchronized(this.childAttrs) {
                    currentChildAttrs = (Entry[])this
                                        .childAttrs
                                        .entrySet()
                                        .toArray(newAttrArray(this.childAttrs.size()));
                }
    
    3.2.3配置服务端的pipline

    3.1中看到服务端的serversocketchannel绑定了一个Pipline,这里主要在pipline里面添加了一个channelhandler

    这个handler也是用户在bootstrap一堆链式调用里面设置的,没有就算了

        void init(Channel channel) throws Exception {
            //...3.2.1    3.2.2部分...
            ChannelPipeline p = channel.pipeline();
            p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
                public void initChannel(Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = ServerBootstrap.this.config.handler();
                    if (handler != null) {
                        pipeline.addLast(new ChannelHandler[]{handler});
                    }
                    //...3.2.4
                }
            
    
    3.2.4添加ServerBootstrapAcceptor用于把用户连接分配给线程

    这里是个核心逻辑,本来就是一个普通的Nio封装,但是这一步将Acceptor作为了handller放在了最前面;

    p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
                public void initChannel(Channel ch) throws Exception {
                    //... 3.2.3 ...内容
                    ch.eventLoop().execute(new Runnable() {
                        public void run() {
                            pipeline.addLast(new ChannelHandler[]{
                                new ServerBootstrap
                                    .ServerBootstrapAcceptor(currentChildGroup  // here
                                    ,currentChildHandler //之前暂存的用户配置的handler放在这个后面
                                    ,currentChildOptions, //之前暂存的用户配置的op放在这个后面 
                                    currentChildAttrs)}); //之前暂存的用户配置的attr放在这个后面
                        }
                    });
                }
            }});
    

    至此,已经完成了启动过程的2/4步;

    3.3 注册selector

    同样在abstractBootStrap.java的initAndRegister(),完成上述两步之后,有一个函数

    ChannelFuture regFuture = this.config().group().register(channel);
    

    这里调用了AbstractChannel.register(channel),又分为几步

    • this.eventloop=eventloop 绑定线程
    • regitster0() 核心注册逻辑
      • doRegister()调用jdk把jdk原生channel注册到selector
      • 回调a:invokeHandlerAddedifNeed() 触发用户在Handler中编写的,channel注册的时候回调函数
      • 回调b:fireChannelRegister() 将注册成功的这个事件通知listener,触发用户在Handler中编写的的回调
      • 回调c:fireChannelActive();同上,都是用户写的回调触发
    3.3.1绑定eventLoop

    AbstractChannel.java

    channel绑定eventLoop后,其后续的事件都会在eventLoop中

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        //....
        AbstractChannel.this.eventLoop = eventLoop;  //3.3.1
        if (eventLoop.inEventLoop()) {
                        this.register0(promise);     //3.3.2 a
                    } else {
                        try {
                            eventLoop.execute(new Runnable() {
                                public void run() {
                                    AbstractUnsafe.this.register0(promise);//3.3.2 b
                                }
                            });
                        }
        //....
    
    3.3.2 注册selector

    在《java读源码之netty》中,注册是执行的上述代码的3.3.2 a,然而自己调试的时候是在3.3.2 b处,看了下这个判断函数,是为了判断channel绑定的EventLoop启动线程与当前线程相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同

    public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
    }
    

    因为这个监听的ServerSocketChannel是第一个channel,此时还没有已经开启运行的eventloop,这个时候就需要执行3.3.2b的启动了,eventLoop新开了一个线程,并且将channel在里面绑定;

    但不管是3.2.2a 还是3.2.2b,最终都进入了——

    private void register0(ChannelPromise promise) {
            //...
            AbstractChannel.this.doRegister();  //step 1
            //...
            AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();//step 2a
                    this.safeSetSuccess(promise);
                    AbstractChannel.this.pipeline.fireChannelRegistered();//step 2b
                    if (AbstractChannel.this.isActive()) {
                        if (firstRegistration) {
                            AbstractChannel.this.pipeline.fireChannelActive(); //step 2c
                        } else if (AbstractChannel.this.config().isAutoRead()) {
                            this.beginRead();
                        }
                    }
    
    • step1 .doRegister()方法注册到selector

    AbstractNioChannel.java

        protected void doRegister() throws Exception {
            boolean selected = false;
    
            while(true) {
                try {
                    this.selectionKey = this.javaChannel()
                        .register(this.eventLoop().selector, 0, this);  //here
                    return;
                } 
    

    这里的this.javaChannel() this指的是netty封装后的channel,通过.javaChannel获取到其内部jdk底层的channel。

    .register( xx,xx,xx) 第一个参数就是正常的jdk底层selector,第二个参数是一些设置(这里没设置),this是指netty自己的封装的channel,作为一个attachment放进去,这样当channel事件发生,就能通过selector直接获取到netty封装好的channel。

    • step2

      • 回调a 对应用户在Handlder中写的handler中的
      @Override
          public void handlerAdded(ChannelHandlerContext ctx) {
              System.out.println("handlerAdded");
          }
      
      • 回调b 对应用户在Handlder中写的handler中的
      @Override
          public void channelRegistered(ChannelHandlerContext ctx) {
              System.out.println("channelRegistered");
          }
      
      
      • 回调c 对应用户在Handlder中写的handler中的,但是这里并不是和step a b 紧接着调用,而是在active之后(第四步的端口绑定后)再次调用这个函数后才会active
      @Override
          public void channelActive(ChannelHandlerContext ctx) {
              System.out.println("channelActive");
          }
      

    3.4 端口绑定

    前面三步都是通过bind()调用doBind()再调用的initAndRegister()中实现,而端口绑定则是在doBind()中调用dobind0()实现,跳出了之前所关注的焦点initAndRegitser()

    AbstractBootstrap.java中的dobind0()最终会调用AbstractChannel.javaBind()方法

        public final void bind(SocketAddress localAddress, ChannelPromise promise) {
                //...
    
                    try {
                        AbstractChannel.this.doBind(localAddress);   //step 3.4.1
                    } 
                //...
                    if (!wasActive && AbstractChannel.this.isActive()) {
                        this.invokeLater(new Runnable() {
                            public void run() {
                                AbstractChannel.this.pipeline
                                    .fireChannelActive();//step 3.4.2也就是在这调用用户的channelActive方法
                            }
                        });
                    }
    
    
    • step 3.4.1 这一步最终调用的底层的channel的jdk原生bind绑定端口
    • step 3.4.2 fireChannelActive()触发pipline上的一系列active回调,最后将端口设置为允许接收请求

    相关文章

      网友评论

          本文标题:Netty源码分析1:启动过程

          本文链接:https://www.haomeiwen.com/subject/pxgoehtx.html