美文网首页
Netty源码分析1:启动过程

Netty源码分析1:启动过程

作者: LucasHao | 来源:发表于2020-03-16 21:14 被阅读0次

1.新建bootstrap(用户调用层)

//serverbootstrap初始化什么也不做
public ServerBootstrap() {
    }

2.为Bootstrap新增组件(用户调用层)

b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .childOption(ChannelOption.TCP_NODELAY, true)
       .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
       .handler(new ServerHandler())
       .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) {
                 ch.pipeline().addLast(new AuthHandler());  
             }
        });

3.正式启动(进入源码):

ChannelFuture f = b.bind(8888).sync();
  • 四大步骤启动:
    • 创建服务端channel
    • 初始化服务端channel
    • 注册selector
    • 端口绑定

对应代码调用:

  • AbstractBootstrap bind()
    • initAndRegister()->channelFactory.newChannel();//制造一个channel
    • initAndRegister()->init(channel);//初始化channel
    • initAndRegister()->register//注册selector
    • dobind()//绑定端口

3.1创建服务端channel

通过绑定.class,工厂反射创建channel;

AbstractBootstrap.java

    public ChannelFuture bind(int inetPort) {
        return this.bind(new InetSocketAddress(inetPort));//step:1
    }
...
    public ChannelFuture bind(SocketAddress localAddress) {
        this.validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        } else {
            return this.doBind(localAddress);//step:2
        }
    }
...
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = this.initAndRegister();//step:3    //...
    }
...
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = this.channelFactory.newChannel();//3.1内容制造一个channel
            this.init(channel);// 3.2内容:初始化
        } 
    //...
    }

可以看到通过channelFactoryinitAndRegister()制造了一个channel,channelFactory的产生代码

ReflectiveChannelFactory.java

    private final Class<? extends T> clazz;
...
    public T newChannel() {
        try {
            return (Channel)this.clazz.newInstance();//反射机制
        }
        //...
    }

这里显然是在之前已经绑定好了工厂生产的产品,是在用户自己初始化的时候绑定的

b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .....

是在AbstractBootstrap.java 中这个函数创建了上面的工厂,并对channel类型绑定

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        } else {
            return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
        }
    }
3.1.2 NioServerSocketChannel实例化的过程

1.反射的东东看完了,来看下channel是怎么new 出来的:

NioServerSocketChannel.java

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
//...
    private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } 
        //...
    }

而其中的这个provider是

NioServerSocketChannel.java

import java.nio.channels.spi.SelectorProvider;
class NioServerSocketChannel{
    //...
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER =                                                                            SelectorProvider.provider();
    //...
}

2.使channel非阻塞

NioServerSocketChannel.java

    public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
        super((Channel)null, channel, 16);// step1:调用父类AbstractNioMessageChannel
        //...
    }

上述代码调用AbstractNioMessageChannel的构造函数,而后者也是直接调用AbstractNioChannel的构造函数,对于16这个参数:应该是TCP的Accept属性,将这个服务端socket设为接收请求的socket;

AbstractNioChannel.java

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        //...
        try {
            ch.configureBlocking(false);//step2:这一部将jdk底层调用产生的channel变成非阻塞
        }
        //...
    }

3.创建channel有关的的id,unsafe,pipline等

在2的最后一步try非阻塞前,AbstrctNioChannel调用其父类AbstrctChannel

AbstrctChannel.java

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        this.id = this.newId();  //channel唯一id
        this.unsafe = this.newUnsafe();//tcp读写底层类
        this.pipeline = this.newChannelPipeline();//pipline创建
    }

4.然后回到NioServerSocketChannel,TCP参数配置存放在这个NioServerSocketChannelConfig里面

    public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
        //...
        this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(
            this, this.javaChannel().socket()
        );
    }

3.2初始化服务端channel

在init函数里面有四个步骤:

3.2.1 配置自身channel的option
    void init(Channel channel) throws Exception {
        Map<ChannelOption<?>, Object> options = this.options0();
        synchronized(options) {
            channel.config().setOptions(options);// 1
        }
        Map<AttributeKey<?>, Object> attrs = this.attrs0();//2
        //...
    }    

这一块的option默认是空的,而设置的地方就是在下图中最开始bootstrap的那一堆链式调用初始化里面自己指定,然后在上面代码中options()调用获取

image.png
同理,也会顺着将创建channel前就设置的attribute正式绑定到channel(如上面代码 //2)
3.2.2配置客户端产生连接后的childChannel的option

这一步类似上一步,获取到childChannel的option和attribute,但这里是暂存,暂存原因在3.2.4解释:

    void init(Channel channel) throws Exception {
        //...3.2.1部分...
        synchronized(this.childOptions) {
                currentChildOptions = (Entry[])this.childOptions
                                    .entrySet()
                                    .toArray(newOptionArray(this.childOptions.size()));
                }

        final Entry[] currentChildAttrs;
        synchronized(this.childAttrs) {
                currentChildAttrs = (Entry[])this
                                    .childAttrs
                                    .entrySet()
                                    .toArray(newAttrArray(this.childAttrs.size()));
            }
3.2.3配置服务端的pipline

3.1中看到服务端的serversocketchannel绑定了一个Pipline,这里主要在pipline里面添加了一个channelhandler

这个handler也是用户在bootstrap一堆链式调用里面设置的,没有就算了

    void init(Channel channel) throws Exception {
        //...3.2.1    3.2.2部分...
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = ServerBootstrap.this.config.handler();
                if (handler != null) {
                    pipeline.addLast(new ChannelHandler[]{handler});
                }
                //...3.2.4
            }
        
3.2.4添加ServerBootstrapAcceptor用于把用户连接分配给线程

这里是个核心逻辑,本来就是一个普通的Nio封装,但是这一步将Acceptor作为了handller放在了最前面;

p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
            public void initChannel(Channel ch) throws Exception {
                //... 3.2.3 ...内容
                ch.eventLoop().execute(new Runnable() {
                    public void run() {
                        pipeline.addLast(new ChannelHandler[]{
                            new ServerBootstrap
                                .ServerBootstrapAcceptor(currentChildGroup  // here
                                ,currentChildHandler //之前暂存的用户配置的handler放在这个后面
                                ,currentChildOptions, //之前暂存的用户配置的op放在这个后面 
                                currentChildAttrs)}); //之前暂存的用户配置的attr放在这个后面
                    }
                });
            }
        }});

至此,已经完成了启动过程的2/4步;

3.3 注册selector

同样在abstractBootStrap.java的initAndRegister(),完成上述两步之后,有一个函数

ChannelFuture regFuture = this.config().group().register(channel);

这里调用了AbstractChannel.register(channel),又分为几步

  • this.eventloop=eventloop 绑定线程
  • regitster0() 核心注册逻辑
    • doRegister()调用jdk把jdk原生channel注册到selector
    • 回调a:invokeHandlerAddedifNeed() 触发用户在Handler中编写的,channel注册的时候回调函数
    • 回调b:fireChannelRegister() 将注册成功的这个事件通知listener,触发用户在Handler中编写的的回调
    • 回调c:fireChannelActive();同上,都是用户写的回调触发
3.3.1绑定eventLoop

AbstractChannel.java

channel绑定eventLoop后,其后续的事件都会在eventLoop中

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //....
    AbstractChannel.this.eventLoop = eventLoop;  //3.3.1
    if (eventLoop.inEventLoop()) {
                    this.register0(promise);     //3.3.2 a
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            public void run() {
                                AbstractUnsafe.this.register0(promise);//3.3.2 b
                            }
                        });
                    }
    //....
3.3.2 注册selector

在《java读源码之netty》中,注册是执行的上述代码的3.3.2 a,然而自己调试的时候是在3.3.2 b处,看了下这个判断函数,是为了判断channel绑定的EventLoop启动线程与当前线程相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同

public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
}

因为这个监听的ServerSocketChannel是第一个channel,此时还没有已经开启运行的eventloop,这个时候就需要执行3.3.2b的启动了,eventLoop新开了一个线程,并且将channel在里面绑定;

但不管是3.2.2a 还是3.2.2b,最终都进入了——

private void register0(ChannelPromise promise) {
        //...
        AbstractChannel.this.doRegister();  //step 1
        //...
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();//step 2a
                this.safeSetSuccess(promise);
                AbstractChannel.this.pipeline.fireChannelRegistered();//step 2b
                if (AbstractChannel.this.isActive()) {
                    if (firstRegistration) {
                        AbstractChannel.this.pipeline.fireChannelActive(); //step 2c
                    } else if (AbstractChannel.this.config().isAutoRead()) {
                        this.beginRead();
                    }
                }
  • step1 .doRegister()方法注册到selector

AbstractNioChannel.java

    protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel()
                    .register(this.eventLoop().selector, 0, this);  //here
                return;
            } 

这里的this.javaChannel() this指的是netty封装后的channel,通过.javaChannel获取到其内部jdk底层的channel。

.register( xx,xx,xx) 第一个参数就是正常的jdk底层selector,第二个参数是一些设置(这里没设置),this是指netty自己的封装的channel,作为一个attachment放进去,这样当channel事件发生,就能通过selector直接获取到netty封装好的channel。

  • step2

    • 回调a 对应用户在Handlder中写的handler中的
    @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("handlerAdded");
        }
    
    • 回调b 对应用户在Handlder中写的handler中的
    @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("channelRegistered");
        }
    
    
    • 回调c 对应用户在Handlder中写的handler中的,但是这里并不是和step a b 紧接着调用,而是在active之后(第四步的端口绑定后)再次调用这个函数后才会active
    @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channelActive");
        }
    

3.4 端口绑定

前面三步都是通过bind()调用doBind()再调用的initAndRegister()中实现,而端口绑定则是在doBind()中调用dobind0()实现,跳出了之前所关注的焦点initAndRegitser()

AbstractBootstrap.java中的dobind0()最终会调用AbstractChannel.javaBind()方法

    public final void bind(SocketAddress localAddress, ChannelPromise promise) {
            //...

                try {
                    AbstractChannel.this.doBind(localAddress);   //step 3.4.1
                } 
            //...
                if (!wasActive && AbstractChannel.this.isActive()) {
                    this.invokeLater(new Runnable() {
                        public void run() {
                            AbstractChannel.this.pipeline
                                .fireChannelActive();//step 3.4.2也就是在这调用用户的channelActive方法
                        }
                    });
                }

  • step 3.4.1 这一步最终调用的底层的channel的jdk原生bind绑定端口
  • step 3.4.2 fireChannelActive()触发pipline上的一系列active回调,最后将端口设置为允许接收请求

相关文章

网友评论

      本文标题:Netty源码分析1:启动过程

      本文链接:https://www.haomeiwen.com/subject/pxgoehtx.html