美文网首页
Netty Server端启动之源码分析

Netty Server端启动之源码分析

作者: 0爱上1 | 来源:发表于2019-05-09 14:09 被阅读0次

    本文基于上一篇Netty Server端实现,以解读源码的方式带领大家理解Server端是如何启动的,主要包括以下几点

    1. EventLoopGroup类的作用是什么?为什么要new两个EventLoopGroup实例

    2. ServerBootstrap类的作用是什么?以及它的build模式中如group()方法,channel()方法,childHandler()方法的分别做了哪些事?

    3. ServerBootstrap的bind方法做了哪些事?

    4. ChannelFuture 接口的作用是什么?

    接下来我们围绕这几个点一一去分析,先贴一下整个Server启动的代码,方便我们分析

    public class Server {
    
    // 监听端口
    private int port;
    private Server(int port) {
        this.port = port;
    }
    
    // 启动一个Server服务器
    private void start() throws InterruptedException {
        // 1.
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 2.
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 3.
                    .channel(NioServerSocketChannel.class)
                    // 4.
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ServerInboundHandler());
                        }
                    })
                    // 5.
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 6.
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
    
            System.out.println("Server is started");
            // 7.
            ChannelFuture f = serverBootstrap.bind(port).sync();
    
            // 8.
            f.channel().closeFuture().sync();
        }finally {
            // 9.
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    
    }
    
    public static void main(String[] args) throws InterruptedException {
    
        // 利用vm参数传递端口号,不传则默认8081
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }else{
            port = 8081;
        }
    
        // 启动server
        System.out.println("Server to starting... the port is: " + port);
        new Server(port).start();
    }
    }
    

    启动分析

    1. NioEventLoopGroup

    • UML类图结构
    NioEventLoopGroup.png

    我们跟踪\color{#DC143C}{NioEventLoopGroup}的构造函数,该类提供了多个重载的构造函数,最后的一个构造函数会调用super(...)的构造函数

    这里直接看其抽象父类\color{#DC143C}{MultithreadEventLoopGroup}

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    这里简单介绍一下该类有一个私有的静态常量 DEFAULT_EVENT_LOOP_THREADS,还有一个static静态块用于初始化该常量值,即\color{red}{默认是cpu核心数的两倍}(在不指定系统参数io.netty.eventLoopThreads的情况下)

    当构造函数的参数nThreads值为0时就会取该常量值作为该EventLoop事件循环的线程数

    继续往下由调用了其抽象父类\color{#DC143C}{MultithreadEventExecutorGroup}的构造方法

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    

    接下来重点分析该类的this(...)函数

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        // 重点是这个children属性
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 调用newChild() 方法初始化每一个children数组的元素
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    
        chooser = chooserFactory.newChooser(children);
    
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
    
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    

    接下来看newChild(...)方法,该方法为抽象方法,具体的执行为NioEventLoopGroup中类重写的方法

    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    

    \color{#DC143C}{NioEventLoopGroup}

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    

    可见这里的children数组的元素在实例话的时候实际上是一个个的\color{#DC143C}{NioEventLoop} 对象,重点来了,我们重点分析这个NioEventLoop对象

    另外一点就是会为chooser属性赋值一个EventExecutorChooser事件循环选择器,该选择器的有一个next方法,作用是当有channel注册时,具体选择哪个事件循环EventExecutor(NioEventLoop)去注册

    • DefaultEventExecutorChooserFactory
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    

    可以看到这里根据每个事件循环group中事件循环的个数是否是2的次方,分别实例化不同的事件执行选择器,默认两个group的选择器都是PowerOfTwoEventExecutorChooser实例

    到这里先总结一下文章开头的第一个问题:

    Server端启动代码前new了两个NioEventLoopGroup对象,每个NioEventLoopGroup对象都有一个EventExecutor数组类型的children属性(实际上new的是NioEventLoop对象),而每个children数组的size即是在new NioEventLoopGroup时传入的参数

    NioEventLoopGroup.png

    2. NioEventLoop

    重点介绍这个类, 该类是Netty底层的核心类,继承了抽象类SingleThreadEventLoop(单线程事件循环),注册Channel到Selector,在事件循环中实现IO多路复用

    这里说下个人的理解,NioEventLoop一个人负责了Java NIO多路复用中的while(true)循环以及Selector的相关工作,包括register,cancel,select等工作,其实现基础是也是基于JDK的Selector实现的,也所以称之为事件循环

    • UML 类图
    NioEventLoop.png

    分析其源码之前我们先带着问题入手

    1. NioEventLoop如何实现循环?

    2. NioEventLoop如何实现Selector选择器相关功能?

    \color{red}{内部重点属性}

    • Nio Selector相关属性

    private Selector selector
    private Selector unwrappedSelector
    private SelectedSelectionKeySet selectedKeys
    private final SelectorProvider provider

    看到这里是不是豁然开朗?结合JDK NIO的源码我们发现NioEventLoop内部持有的就是JDK NIO的Selector,也就是利用它们实现了事件register, select

    • NioEventLoop 自身相关属性

    执行事件select时的策略器,即提供了一种能力去控制事件循环的行为,比如一个正在阻塞的select操作能被延迟或跳过,如果有事件需要被立即处理的话

    默认情况下属性被赋值为DefaultSelectStrategy实例
    private final SelectStrategy selectStrategy

    private volatile int ioRatio = 50
    private int cancelledKeys
    private boolean needsToSelectAgain

    ioRatio属性控制IO任务执行的时间占比
    cancelledKeys属性表示取消注册的Key集合

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    实例化NioEventLoop完成以下:

    1. 调用父类构造完善父类属性parent等,指向其group

    2. 赋值selectorProvider至provider属性

    3. openSelector()就是利用1.中的provider调用openSelector() 打开一个Selector,并将其赋值给unwrappedSelector属性以及赋值selectedKeys属性等

    4. 赋值selectStrategy属性

    至此我们明白了NioEventLoop 通过持有JDK的Selector从而实现select相关功能,那循环又是如何实现的呢?

    \color{red}{run()}

    NioEvent'Loop重写了SinglethreadEventExecutor中的抽象run方法,该方法即时循环的关键实现

    @Override
    protected void run() {
        // 死循环实现
        for (;;) {
            try {
                try {
                    // 每一次循环都会计算select调用策略,如果taskQueue有任务,即直接执行selectNow(),不阻塞
                    // 若taskQueue没有任务,即执行select() 默认阻塞1秒
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
    
                // 不论阻塞调用select还是非阻塞调用,都会执行以下
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    // 当前时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        处理io事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // io事件处理结束,处理taskQueue中的任务,并指定非io任务超时时间,按ioRatio比例计算出来
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    至此我们知道了Nio如何实现事件循环的大体流程,详细的事件循环我会单独放在一篇文章中讲解NioEventLoop事件循环详解


    2. ServerBootstrap

    1. UML类图
    ServerBootstrap.png
    1. 属性

    volatile EventLoopGroup group 即bossGroup

    private volatile EventLoopGroup childGroup; 即workerGroup

    private volatile ChannelHandler childHandler; 即处理请求channel的处理器

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>()
    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();

    持有Channel反射工厂
    private volatile ChannelFactory<? extends C> channelFactory

    1. group方法

    设置用于bossGroup 和workerGroup,bossGroup的事件循环用于处理serverChannel的accept,而workerGroup里的事件循环则用于处理所有channel的IO

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // 调用父类方法赋值group属性
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        // 赋值childGroup属性
        this.childGroup = childGroup;
        return this;
    }
    
    1. channel方法

    channel 就是根据传入的channel 的class类型去创建一个ReflectiveChannelFactory反射Channel工厂,并赋值给channelFactory属性

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return self();
    }    
    
    • ReflectiveChannelFactory
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
    private final Constructor<? extends T> constructor;
    
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            // 根据clazz类型获取其构造器
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }
    
    @Override
    public T newChannel() {
        try {
            // 利用反射调用构造器new对应的channel实例
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    
    1. childHandler方法

    就是赋值childHandler属性

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }
    
    1. option方法

    就是向属性options中添加元素,该Map内的元素会用在Channel实例被创建时,调用native方法为底层socket设置相关属性若想移除内核默认socket的某个属性值,只要将参数value设置null即可

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }
    
    1. childOption方法

    同理该方法即是用在channel被创建时指定底层socket属性,若想移除内核默认socket的某个属性值,只要将参数value设置null即可

    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        if (childOption == null) {
            throw new NullPointerException("childOption");
        }
        if (value == null) {
            synchronized (childOptions) {
                childOptions.remove(childOption);
            }
        } else {
            synchronized (childOptions) {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
    

    至此ServerBootstrap基本属性都通过了build的方式赋值完毕,接下来看下关键方法bind方法做了

    \color{red}{serverBootstrap.bind(port)}

    bind方法定义在了其抽象父类AbstractBootstrap中

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    

    new了一个InetSocketAddress对象作为参数继续调用了重载的bind方法

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
    

    validate方法做了一些基础校验,包括group属性即bossGroup是否为null以及channelFactory是否为null等

    接下来继续调用了doBind方法,重点来了

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    首先我们看下该方法的返回值是一个ChannelFuture对象,至于ChannelFuture是一个通道异步IO操作结果,因为Netty中所有IO操作都是以异步的方式,后面我会专门一篇文章来分析ChannelFuture
    Netty源码之ChannelFuture

    doBind方法首先调用了initAndRegister方法,从方法名上我们知道这是一个初始化和注册的方法,初始化谁?注册谁?

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. 利用通道工厂创建一个新的channel,这里内部就是利用NioServerSocketChannel构造反射一个实例,并利用默认的SelectorProvider open了一个NioServerSocketChannel实例
            channel = channelFactory.newChannel();
            // 2. 紧接着调用init方法,该方法为抽象方法,实际的init实现在ServerBootstrap中,该方法主要处理该channel的ChannelOption属性,attr属性以及pipeline以及相应的handler等信息
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 3. 注册该channel到serverBootstrap的bossGroup事件循环组上,具体的注册需要由事件循环组`选择`一个事件循环(`NioEventLoop`)来注册
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.
    
        return regFuture;
    }
    
    1. 继续向下看ReflectiveChannelFactory的newChannel方法做了什么
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    

    这里由于之前赋值了属性constructor为NioServerSocketChannel的构造器,故反射调用实例化

    NioServerSocketChannel

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    

    利用默认的SelectorProvider 去openServerSocketChannel,在继续调用this()方法

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    

    这里可以看到ServerSocketChannel 实例构建以后再去调用父类方法

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    

    即赋值id属性,以及new一个该Channel所属的pipeline

    至此ServerSocketChannel实例化完成

    1. init(channel)方法做了什么?

    init方法是AbstractBootstrap定义的抽象方法,具体的实现是由ServerBootstrap实现的,我们直接子类的实现

    void init(Channel channel) throws Exception {
        // 1. 获取options集合中,将其设置到底层ServerSocket上
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        // 2. 获取attrs集合,设置到channel上
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        // 3. 获取channel所属的pipeline
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
    
        
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // 4. 重点是这里为ServerSocketChannel添加了一个入站处理器ServerBootstrapAcceptor,该处理器会当ServerSocketChannel有Accept事件时负责将
                socketChannel注册到currentChildGroup,并设置currentChildHandler等工作,即ServerBootstrapAcceptor是一个桥梁,联通了bossGroup和workerGroup
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    
    1. config().group().register(channel)注册

    当ServerSocketChannel实例化后,并init完成,但是此时该channle还没有注册到eventLoop上,接下来就会完成注册动作

    config().group()方法就是获取当前bossGroup事件循环组实例,即NioEventLoopGroup,我们直接看它的register方法做了什么

    NioEventLoopGroup的register方法并没有覆写其抽象父类MultithreadEventLoopGroup的方法

    MultithreadEventLoopGroup

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    

    这里调用了内部的next方法,返回一个EventLoop实例,这里大家思考以下next方法的作用是什么?

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    

    这里调用了其父类MultithreadEventExecutorGroup的next方法,继续

    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    

    到了这里大家是否明白next方法的作用了呢?文章开头部分讲述了在new NioEventLoopGroup对象的时候有一个属性是chooser的赋值,该属性是一个eventLoop选择器,因为我们的NioEventLoopGroup对象内有一个eventLoop数组,当我们在注册某个channel的时候到底是注册到哪个eventLoop上呢?这个工作由这个选择器来完成

    当时实例化的是PowerOfTwoEventExecutorChooser选择器,选择的规则就是用一个自增的AtomicInteger类型的idx值去取模eventLoop数组的length - 1,就得到了channel需要注册到的eventLoop数组的下标从而取出对应的eventLoop去注册

    AbstractChannel 类的注册方法

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            // 赋值该channel所要注册到的事件循环是哪个
            AbstractChannel.this.eventLoop = eventLoop;
            // 判断当前执行线程是否是该eventLoop内部的单线程,当前线程是Main,此时的eventLoop 内的单线程为null,还没有启动过
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // 这里调用eventLoop的execute提交任务执行register0注册
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
    

    继续向下看NioEventLoop的execute方法,该方法定义在其抽象父类
    SingleThreadEventExecutor内,NioEventLoop并未重写该方法

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        
        // 判断当前执行线程是否是该EventLoop自身线程,这里是Main方法调用的,且该EventLoop自身线程还未启动(`thread属性仍为null`),返回false
        boolean inEventLoop = inEventLoop();
        // 将需要执行的任务丢进taskQueue任务队列中
        addTask(task);
        if (!inEventLoop) {
          // 这里会新建该EventLoop的唯一单线程,并调用其run方法启动事件循环
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
    
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    

    startThread方法 启动EventLoop的执行线程

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
    

    可以看到这里利用了CAS实现保证了只启动一个线程,再看doStartThread方法内部

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 调用自身的run方法,该方法由NioEventLoop覆写了
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
    
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }
    
                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();
    
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                if (logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
    

    这里的executor是当时创建NioEvnetLoopGroup时new的ThreadPerTaskExecutor任务执行器,包含了一个默认的线程工厂(含有“nioEventLoopGroup-2-”前缀属性,即一个事件循环组对应一个默认线程工厂)

    ThreadPerTaskExecutor类

    public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;
    
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
    

    }

    这里就是调用线程工厂去new一个线程,并把任务传进去执行该任务,注意这里的command已经交给了新建的线程执行了

    因此thread = Thread.currentThread() 被赋予了当前执行的新线程即以“nioEventLoopGroup-2-”前缀命名的EventLoop线程

    这里的SingleThreadEventExecutor.this.run(),EventLoop线程正式启动了NioEventLoop的事件循环方法,而在其run方法中会利用for死循环不断执行该EventLoop的IO任务以及非IO任务(此处的注册任务就属于非IO任务)

    initAndRegister方法执行完成了,断点一下,我们看看此时ServerSocketChannel的属性状态

    initAndRegister.png ServerSocketChannel.png

    可以看到registered状态为true表示已注册,我们通过cmd查看一下本机端口占用情况

    image.png

    此时的端口8081还未被监听,也就是说ServerSocketChannel还没有bind到8081端口并启动监听,继续往下看代码

    \color{red}{doBind0(regFuture, channel, localAddress, promise)}

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    可以看到这里就是通过获取ServerSocketChannel注册到的那个EventLoop实例并提交一个任务,任务就是将ServerSocketChannel 绑定到指定的端口上

    bind实际上是调用了NioServerSocketChanneldoBind方法

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    

    这里的javaChannel()的bind方法实际上是调用了ServerSocketChannelImpl实例自身的bind方法,注意这里用到了config里的backlog属性值,如果在ServerBootstrap的option方法不指定的话,默认windows下该值默认为200,其他情况下为128

    The SOMAXCONN value of the current machine. If failed to get the value, {@code 200} is used as a default value for Windows or {@code 128} for others.

    这里有一篇fasionchan博主发布的关于内核backlog参数的叙述
    深入理解Linux TCP backlog

    public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
        synchronized(this.lock) {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            } else if (this.isBound()) {
                throw new AlreadyBoundException();
            } else {
                InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
                SecurityManager var5 = System.getSecurityManager();
                if (var5 != null) {
                    var5.checkListen(var4.getPort());
                }
    
                NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
                Net.bind(this.fd, var4.getAddress(), var4.getPort());
                Net.listen(this.fd, var2 < 1 ? 50 : var2);
                synchronized(this.stateLock) {
                    this.localAddress = Net.localAddress(this.fd);
                }
    
                return this;
            }
        }
    }
    

    最终这里调用了bind方法和listen方法,即完成了端口的绑定和监听工作,等待Client端的connect请求

    至此整个Netty Server端启动完成,下面用一张流程图表示整个Netty Server启动流程方便记忆

    Netty Server.png

    总结

    这里回复一下文章开头的几个问题

    1. EventLoopGroup的作用说白了就是个Selector事件循环组,当有channel需要注册时,他会提供选择某个NioEventLoop去注册的功能
      至于有两个group的原因:bossGroup内提供的NioEventLoop数组是用来handle客户端的Accept连接请求的,而workGroup内的NioEventLoop则是处理客户端连接之后的事件循环(IO任务和非IO任务),一般bossGroup的NioEventLoop数组数量设为1, 而workGroup的数量默认为当前CPU数量的2被

    2. ServerBootstrap类的作用是一个简化Netty Server程序启动的启动类,其中包含了很多贯穿整个Netty程序需要用到的属性,比如channel方法指定了用于监听client socket连接的ServerSocket 的class类型,childHandler方法则用于当有client连接事件准备好后,并创建了对应的SocketChannel后,该SocketChannel对应的pipeline中需要添加进去哪些handler

    3. ServerBootstrap的bind方法是Netty Server启动的最后关键一步,前期的相关方法都可以认为是相关静态属性赋值(底层Selector初始化除外),通过bind方法,Netty实例化了NioServerSocketChannle实例,并为其init了该channel相应的option属性以及pipeline内的相关handler,这里add了一个重要的handler就是ServerBootstrap的内部类ServerBootstrapAcceptor,该类继承了ChannelInboundHandlerAdapter 入站处理器,client连接以后,ServerSocket 的pipeline最有一个处理器就是该处理器,会在该处理器中进行socketChannel的相关init工作(option属性配置,pipeline添加childHandler以及调用workGroup发起注册OP_READ动作等),最有bind方法会进行该NioServerSocketChannle的内核底层绑定端口,并监听端口,等待client的请求连接

    4. 最后说下ChannelFuture的作用,Netty中所有的异步channel的IO操作都是异步的方式,意味着所有的IO操作都将会立即返回,即无法保证所请求的I/O操作在调用结束时已完成,代替的是将会得到一个ChannelFuture的实例,通过这个实例我们将可以得到IO操作的结果和状态

    而获取结果的方式是给这个ChannelFuture的实例增加一个Listener,当该ChannelFuture实例isDone为true的时候,会通知该监听器的operationComplete方法,而我们会在该方法内部再编写逻辑,根据IO操作的结果是成功还是失败而做出不同的处理

    以上就是Netty Server端启动的所有分析,因水平有限,如有错误的地方还望不吝指出,共同进步...

    相关文章

      网友评论

          本文标题:Netty Server端启动之源码分析

          本文链接:https://www.haomeiwen.com/subject/nhqmoqtx.html