美文网首页
【Netty】Netty的启动过程一

【Netty】Netty的启动过程一

作者: 小圣996 | 来源:发表于2020-01-08 14:43 被阅读0次

首先来张网上盛传的netty框架参考图,以供跟踪代码参考:


netty框架参考图.jpg

一段标准的Netty服务端启动代码如下:

    public NettyTcpServer() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup(4);
        bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 5)
                .childOption(ChannelOption.TCP_NODELAY, true);
    }

    public void bind(String ip, int port) {
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast("decoder", new ProtoDecoder(upLimit))
                        .addLast("server-handler", new ServerHandler()) 
                        .addLast("encoder", new ProtoEncoder(downLimit));
            }
        });
        InetSocketAddress address = new InetSocketAddress(ip, port);
        try {
            bootstrap.bind(address).sync();
        } catch (InterruptedException e) {
            log.error("bind "+ip+":"+port+" failed", e);
            shutdown();
        }
    }

Netty的服务端,一般会启动两个NioEventLoopGroup线程组(个人感觉用组比用池更准确,这里组指数组),一个为bossGroup线程组,处理客户端的连接请求;一个workerGroup线程组,用来处理IO事件。很多人都知道Netty服务端就是做这事的,今天我们就用源码来揭示这是如何实现的。
先从NioEventLoopGroup的构造方法着手:

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

一路查看实现,会来到这里:

    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

可见,实例化NioEventLoopGroup时,如果在这里没有设置参数,也没有在JVM参数里设置“-Dio.netty.eventLoopThreads=x”,那么这个线程组的默认线程数为CPUx2,否则为设置的参数值,最后,可以看到NioEventLoopGroup的具体实现为:

        if (executor == null) {
            //后续Netty对各种IO事件的处理就是通过此executor创建线程处理的
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //创建NIOEventLoop数组
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //实例化每个NIOEventLoop,每个NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                //......
            }
        }

        //选择器,选择由哪个NIOEventLoop处理,注意这里以children作为传参
        chooser = chooserFactory.newChooser(children);

在实例化NIOEventLoopGroup时,首先创建了一个Executor,而Executor的作用就是通常被用来代替显示地创建线程的,Executor对象可以用来执行Runnable任务,该接口将“任务提交”从任务的运行机制中解耦出来,包括线程使用、调度等细节。我们看Netty中的它的实现如下,由它的execute方法中threadFactory.newThread(command).start();一句,证实了我们所说的Executor的作用,netty很有可能就是通过这句来创建它的那些IO线程的,我们不妨先猜猜,它是在什么时候执行这句的呢?(彩蛋)

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
      threadFactory.newThread(command).start();//创建NettyIO线程,重点代码,将断点打于此处,可查看整个Netty线程启动步骤
    }

再回溯到上面的NIOEventLoopGroup代码段,由children = new EventExecutor[nThreads];一句,NIOEventLoopGroup实则创建了一个new NioEventLoopGroup(4)中参数数量的NIOEventLoop数组,NIOEventLoop的实例就是通过newChild(executor, args)方法添加的,由下面的几段代码可知,每个NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers。而newChild(executor, args)方法就是做NIOEventLoop实例的初始化工作,一路跟踪我们还可发现,NIOEventLoop的任务队列用的是LinkedBlockingQueue,大小为Integer.MAX_VALUE,如果没设置JVM参数“-Dio.netty.eventLoop.maxPendingTasks=x”的话

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NIOEventLoop构造为:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }

再看该构造中的super方法为:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

由以上代码我们大概知道了NIOEventLoopGroup和NIOEventLoop的实现,NIOEventLoopGroup其实是一个NIOEventLoop的数组,每个NIOEventLoop都公用了一个Executor,后续创建线程的事都由Executor来创建,由threadFactory.newThread(command).start();一句可知。

但是我们还有一个疑问,那就是Netty什么时候开始启动这些线程的呢?上述线程的启动貌似和new NioEventLoopGroup(4)中的参数也没有任何关系,那它怎么知道只创建new NioEventLoopGroup(4)中的参数个数的线程呢?我们只知道这个参数赋值给了children = new EventExecutor[nThreads];但在哪里限制了此参数的线程数量呢?即Netty怎么知道最大要启动这么多个线程呢?线程的启动数量和这个children数组有什么关系呢?

带着这些疑问,我们不妨全局搜索这个children这个变量

children全局引用.png

发现它大多数时候用于关闭和终止判断,这些看不出和启动线程数有什么关系;只有在NioEventLoopGroup的实现中这里将children传参给了一个字面意思叫选择器的类,如下:

//选择器,选择由哪个NIOEventLoop处理,注意这里children的传参
chooser = chooserFactory.newChooser(children);

启动的线程数是否和这个选择器有关呢?我们再跟进去看看

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

发现,netty把这个NIOEventLoop数组赋值给了它的两个内部类Chooser之一,它们的唯一提供的使用方法是next()方法,看这方法实现:executors[Math.abs(idx.getAndIncrement() % executors.length)]貌似这里告诉了netty当前应使用哪个NIOEventLoop,看来可以怀疑启动的线程数量和这个next()方法有关,那只要全局搜下这个next()方法,就应该知道了

next()全局引用.png

有这么多地方调用,要是第一次看Netty源码,从next()反推过去看代码直至看到跟ServerBootstrap或NioEventLoopGroup相关的next()调用,太难找了(我第一次看就是- -),而Netty服务端启动代码中间还有那么多的源码没看,究竟是在哪里限制了线程的启动数量呢?

既然不知道在哪里限制线程数量的,也不知道何时启动线程的,但是线程的创建地方我们是找到了的,即executor的初始化地方,如下

        if (executor == null) {
            //后续Netty对各种IO事件的处理就是通过此executor创建线程处理的
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

点进ThreadPerTaskExecutor,根据Executor的作用“就是通常被用来代替显示地创建线程的”,我们怀疑是在这里创建线程的,那么我们何不在此打上断点呢?

    @Override
    public void execute(Runnable command) {
      threadFactory.newThread(command).start();//创建NettyIO线程,重点代码,将断点打于此处,可查看整个Netty线程启动步骤
    }

打上断点后,启动ServerBootstrap,发现确实调到这里来了,由此证实了我们猜想的正确性,通过查看调用堆栈,如下图所示:

服务端启动启动boss线程.png

一下就可以发现,原来Netty第一个线程的启动是在绑定地址和端口开始的,再看里面的NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) 行: 85 这句,

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel); //先选一个NIOEventLoop出来,然后再执行它的register(channel)方法
    }

它就是我们在查找next()全局引用图中(见上一图next()全局引用)的最小红框标记的MultithreadEventLoopGroup类下的register(Channel)方法,我们开始不知道netty启动线程是如何限制数量的,只怀疑这数量限制跟next()方法有关,现在前后呼应起来了,知道是这么个流程了,即Netty是在绑定服务器地址和ip时,先启动一个线程去接受客户端连接的,这个线程的启动过程就如上图所示(客户端的启动线程是在connect方法中开始的)。

再回到Netty的服务端启动代码,现在我们换种方式看源码,之前我们只看了NioEventLoopGroup实现,其余的还都没看,现在我们跳过中间那些源码,从bootstrap.bind(address).sync();中bind方法开始。

一直跟踪方法堆栈,暂且先别管其他源码,我们要先弄懂Netty是如何启动boss线程组,去接受客户端的连接,如何启动worker线程组,去处理各类IO事件。点进看register实现:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }

跟踪源码时可知NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop继承自SingleThreadEventExecutor,SingleThreadEventExecutor最终又继承自Executor,所以在这里最终会进入我们刚打的断点,再看SingleThreadEventExecutor中的execute方法:

    @Override
    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    }

进入startThread()最终来到这里:

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {//这里就是每个NIOEventLoop中的executor,它会执行NIOEventLoop中的executor中的threadFactory.newThread(command).start();这句
            @Override
            public void run() { //注意这里是线程启动后需执行的代码
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                }
            }
        });
    }

这便创建并启动了netty的处理客户端连接的线程。线程启动后需要做什么事呢?根据我们的了解,Netty的bossGroup线程组和workerGroup线程组启动后,要分别时刻处理客户端的连接和IO事件,那么这些线程应该具备某种功能,需要时时刻刻知道是否有客户端连接或IO事件,根据以往经验,这个实现往往是用无限for循环实现的,我们只要找到哪里有for死循环的地方或类似功能的地方即可。再点进SingleThreadEventExecutor.this.run(),来到NioEventLoop的run方法,我们要找到哪里具备如此功能:

@Override
    protected void run() {
        for (;;) {
            try {
                //可以在此处打上断点,验证启动后是否会进这里无限循环
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } catch (Throwable t) {
            }
        }
    }

一进来就发现是无限循环了,根据以往对无限循环的认知,在无限循环里往往是可以时时刻刻做某种事的,再点进processSelectedKeys()方法,查看哪里在做跟连接和处理IO相关的事了,最终找到这里:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

由SelectionKey.OP_CONNECT、SelectionKey.OP_ACCEPT、SelectionKey.OP_READ、SelectionKey.OP_WRITE字面猜想,很可能就是这里处理连接和IO事件的,我们先在上面run方法里的switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 这句打上断点,然后再分别在这些if分支里再打上断点,验证我们的猜想,重新启动netty服务端:

启动netty服务端进入run方法无限循环.png

果然进入了此断点,验证了我们猜想的正确性,但这里只是启动了处理客户端连接的线程,这里还无法进入后面打的那些if分支的,因为还没有客户端请求连接和发送数据操作,因此,先放开run里的断点,我们再启动一个netty客户端(netty服务端和netty客户端分别用的是《使用Netty+Protobuf实现游戏TCP通信》的源码):

netty客户端在connect方法中启动了自己的线程.png

可见,客户端在自己的connect()方法中启动了自己的线程,请求后,客户端进入了SelectionKey.OP_CONNECT分支,即发起请求连接

客户端进入SelectionKey.OP_CONNECT分支发起请求连接.png

而服务端,则进入了SelectionKey.OP_ACCEPT分支,此时readyOps=16,接受连接请求

服务端进入SelectionKey.OP_ACCEPT分支接受连接.png

若我们提前在threadFactory.newThread(command).start()打上断点的话,如下:

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

会发现,服务端会再次(第二次)进入此断点,第一次服务端进入此断点,是启动boss线程,第二次进入是启动worker线程,若客户端还有数据发给服务端,则服务端还会再次进入SelectionKey.OP_READ分支,此时readyOps=1

服务端再次进入SelectionKey.OP_READ读取IO数据.png

最终来个简单流程回顾:
1)首先服务端启动netty -> 服务端netty会启动boss线程;
2)客户端启动netty -> 客户端netty会启动自己请求连接线程,客户端进入SelectionKey.OP_CONNECT分支;
3)服务端进入SelectionKey.OP_ACCEPT分支;
4)服务端netty -> 启动worker线程,接受客户端的连接;
如果客户端再发送数据给服务端:
5)服务端进入SelectionKey.OP_READ,读取客户端发送的数据;
如果服务端也发送数据给客户端:
6)客户端进入SelectionKey.OP_READ,读取服务端发送的数据;

我们现在已经大致知道了是如何启动boss线程的了,那么worker线程又是如何启动的呢?超过worker线程上限,netty又如何知道不需要再启动线程了呢?

限于篇幅,将在下篇讲解,敬请期待...

相关文章

网友评论

      本文标题:【Netty】Netty的启动过程一

      本文链接:https://www.haomeiwen.com/subject/birfactx.html