美文网首页深入浅出Netty源码剖析Java学习笔记
Netty源码分析:1.4服务器启动流程

Netty源码分析:1.4服务器启动流程

作者: 蓝汝丶琪 | 来源:发表于2018-04-17 17:59 被阅读32次

    第一章节是主要是服务器启动的代码分析。章节目录有:
    |---------1.1初始化NioEventLoopGroup
    |---------1.2初始化NioEventLoop
    |---------1.3初始化NioServerSocketChannel
    |---------1.4服务器启动流程
    为什么先从初始化开始了解服务器启动?
    因为在我看服务器启动的相关源码的时候,有很多地方都是初始化的时候已经建立好的。所以我就从初始化的源码开始看起。这是我第一次看源码的笔记,仍有很多理解错误的地方和不解的地方。欢迎讨论。

    本篇目录

    • 启动服务器代码
    • 代码分析

    启动服务器代码

    • 1 创建NioEventLoopGroup对象。上文已经介绍了NioEventLoopGroup的初始化已经内部线程NioEventLoop的初始化话。
    • 2 创建ServerBootstrap对象,该类初始化主要是创建了几个LinkedHashMap来存储设置。例如childOptions或者childAttrs
     public void bind() {
     
             //1 
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup work = new NioEventLoopGroup();
            try {
    //            引导绑定和启动服务器
                //2
                 ServerBootstrap b = new ServerBootstrap();
                 //3
                 b.group(boss, work);
    //            创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等
                //4
                 b.channel(NioServerSocketChannel.class);
    //            设置childHandler执行所有的连接请求
                //5
                 b.childHandler(new ChildChannelHandler());
                 //6
                 b.option(ChannelOption.SO_BACKLOG, 100);
                 
                //绑定端口
                //7
                ChannelFuture future = b.bind(8080).sync();
    
                //8
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //9
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
    
        }
    

    3 Group

    赋值两个线程池。
    1 首先第一个boss线程池是赋值到父类AbstractBootstrap的group变量中。
    2 work线程池就赋值在ServerBootstrap的childGroup变量中。

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            //1
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }
    

    4 channel()

    • 首先ReflectiveChannelFactory类是一个工厂类,里面只有一个方法newChannel()用来将传入的class进行无参构造生成对象的。
    • 然后channelFactory()是将ReflectiveChannelFactory赋予ServerBootstrap的父类的channelFactory参数

    此时channel 还没被创建,直到bind()方法中调用initAndRegister()方法的时候才会用该工厂类生成一个channel。下文会讲。

     public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    

    5 childHandler()

    在调用这个childHandler()这个方法的时候,你需要自己写一个方法继承ChannelInitializer类。而ChannelInitializer类也是继承ChannelInboundHandlerAdapter的。所以childHandler()方法的参数就是我们自己写的类。然后赋值到childHandler参数。

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
            if (childHandler == null) {
                throw new NullPointerException("childHandler");
            }
            this.childHandler = childHandler;
            return this;
        }
    

    6 Option()

    将一些配置存储到options变量中,该变量是一个LinkedHashMap

    public <T> B option(ChannelOption<T> option, T value) {
            if (option == null) {
                throw new NullPointerException("option");
            }
            if (value == null) {
                synchronized (options) {
                    options.remove(option);
                }
            } else {
                synchronized (options) {
                    options.put(option, value);
                }
            }
            return (B) this;
        }
    

    7 bind()

    image.png

    之前的一些方法都只是对ServerBootstrap的配置,说白了就是用来set参数的。
    bind()则是开始启动服务器了。

    • 1 进行对groupchannelFactory两个参数进行非空验证
    public ChannelFuture bind(SocketAddress localAddress) {
            validate(); //1 
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    

    doBind()

    • 1 创建一个channel 并且初始化和注册。关键部分。代码分析看下面
    • 2 判断channel是否注册成功。如果已经注册成功那么就进行doBind0()方法。如果还没成功,那就添加一个监听器,等返回成功的时候就进行doBind0()方法。解析看下文。
     private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister(); //1
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
            //2
            if (regFuture.isDone()) {
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else { 
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                           
                            promise.setFailure(cause);
                        } else {
                          
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
    

    doBind0()

    • 1 该方法是执行在channel已经与selector注册后的。给线程添加一个任务。该任务是绑定端口的。

    到这个方法。服务器启动就已经完成了

     private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
            //1
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    下文都是对initAndRegister()方法的代码解析

    initAndRegister()

    • 1 利用工厂创建一个channel对象。初始化内容看上一篇文章初始化NioServerSocketChannel
    • 2 对channel进行一个初始化,看下文。
    • 3 对channel进行注册,看下文
    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
               //1
                channel = channelFactory.newChannel();
                //2
                init(channel);
            } catch (Throwable t) {
              
            }
            //3
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    

    init()

    • 1 对channel 的config变量里添加options属性
    • 2 对channel 的config变量里添加attrs属性
    • 3 获取channel 对象的pipeline管道,然后在管道里面添加一个handler,该handler作用有:添加bootstrap里的handler。对channel 添加一个任务。
    • 4 添加bootstrap里的handler。对channel 添加一个任务
    void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            //1 
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            //2
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
            
            //3
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
            
            //4
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    
    

    config().group().register(channel)

    • config().group()是获取到线程池,该线程池是ServerBootstrap的父类AbstractBootstrap存储的boss线程池。是不是恍然大悟,这就是把channel放到boss线程池里的一个线程里面去执行任务啊。
    • 1 是MultithreadEventLoopGroup类的方法 也就是NioEventLoopGroup的父类。 该方法就是从线程池里获取一个EventLoop.然后执行EventLoop里的register()方法。
     @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
    @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel); //1
        }
    

    继续看下去。

    • 2 是SingleThreadEventLoop里的方法。``DefaultChannelPromise()` 方法是给channel 与该线程 添加一个监听器。
    @Override
        public ChannelFuture register(Channel channel) {
           //2
            return register(new DefaultChannelPromise(channel, this));
        }
    

    继续往下看

    • 3 继续是SingleThreadEventLoop里的方法。promise是监听器对象,promise.channel()获取到channel。unsafe()方法是实现底层的register,read或者write操作
     @Override
        public ChannelFuture register(final ChannelPromise promise) {
            //3
            ObjectUtil.checkNotNull(promise, "promise");
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    

    继续往下分析,该register()方法是AbstractChannel类的。也就是初始化NioServerSocketChannel的时候,建立pipeline和unsafe的类。

    • 4 进行参数验证
    • 5 将该eventLoop线程赋值于channel参数。
    • 6 eventLoop.inEventLoop() 判断,如果现在的线程是EventLoop()的 线程,那么执行任务,如果不是那么就用执行器执行任务。在这里debug,会返回false,会调用execute()方法。
      下面继续探讨
    • 7 下面探讨下register0()方法
     @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                //4
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
                //5
                AbstractChannel.this.eventLoop = eventLoop;
                //6
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                            //7
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                      
                    }
                }
            }
    

    首先在研究register0()之前有一个小知识。

    在版本4.1.x 的时候,在初始化eventLoop的时候,还没有创建线程,而是保存了Executor这个变量。这个变量在4.0.x版本的时候是没有的。那么4.1.x版本在什么时候创建线程呢?在eventLoop调用execute()方法的时候创建线程。下文可以看到

    • 1 继续判断是否现在的线程是EventLoop对象的线程,肯定返回false,因为EventLoop线程里的thread变量是null嘛。
     @Override
        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }
    
    image.png
    • 2 在startThread()方法中,才创建一个线程。
    • 3 在任务队列里添加一个任务。
    public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            //1
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                 //2
                startThread();
                //3
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    

    register0(ChannelPromise promise)

    • 1 判断EventLoop线程是否还存活。
    • 2 这个是记录是否注册过的。neverRegistered默认是true;
    • 3 这个是核心代码,到这里才进行将channel与selector注册在一起。
     private void register0(ChannelPromise promise) {
                try {
                    //1
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    //2
                    boolean firstRegistration = neverRegistered;
                    doRegister();//3
                    neverRegistered = false;
                    registered = true;
    
          
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                         
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                
                }
            }
    

    相关文章

      网友评论

        本文标题:Netty源码分析:1.4服务器启动流程

        本文链接:https://www.haomeiwen.com/subject/dsypkftx.html