美文网首页
Netty服务端启动过程(ServeBootstrap)

Netty服务端启动过程(ServeBootstrap)

作者: 袁小象 | 来源:发表于2019-04-25 14:11 被阅读0次

    本篇文章主要梳理了Netty服务端的一个启动过程,比较直接,阅读此篇文章需要对Netty的基本组件以及模型有一个基本的了解。
    一个典型的Netty服务端代码如下所示:

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap(); 
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            ch.pipeline().addLast(new LineBasedFrameDecoder(2014));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    

    步骤如下:
    1、创建两个EventLoopGroup,一个是bossGroup,一个是workerGroup,前者主要负责获取新连接的操作,后者负责处理新连接的I/O操作。
    2、定义一个ServerBootStrap实例,初始化线程池(两个)、channel类型(NioServerSocketChannel)、channel参数选项(SO_BACKLOG)、添加NioServerSocketChannel的handler(LoggingHandler)以及新连接channel的handler(ChannelInitializer)。
    3、绑定端口号,启动服务器,主线程同步阻塞等待。
    4、服务端的channel关闭之后,优雅关闭线程池
    ========================bind(port)=================================
    bootstrap的bind调用链:
    AbstractBootstrap.bind(int inetPort)
    -> AbstractBootstrap.bind(SocketAddress localAddress)
    -> AbstractBootstrap.doBind(final SocketAddress localAddress)
    最终调用的是下面的逻辑:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    通过上面的代码,可以看出来doBind主要做了两件事:
    1、initAndRegister();主要负责初始化NioServerSocketChannel实例、以及将该channel注册到eventLoop等操作。
    2、绑定地址;如果上一步的注册操作完成了,直接绑定地址, 没完成的话,对注册返回的Future绑定监听器,在监听器中绑定地址。

    ======================initAndRegister()========================

    initAndRegister()的逻辑,去除了非关键的try catch语句以及相关的异常处理:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        channel = channelFactory.newChannel();
        init(channel);
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
    

    通过上面的代码,可以看出来initAndRegister主要做了以下几件事:
    1、创建ServerSocketChannel实例,并为之创建了关联ChannelPipeline实例,具体逻辑可以看ServerSocketChannel的无参构造函数,这里就不详述了。
    2、执行init方法,进行channel的初始化操作。
    3、执行注册逻辑,将ServerSocketChannel实例注册到NioEventLoop中。

    ======================init逻辑==============================

    init方法在ServerBootStrap类中实现:

    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    

    通过上面的代码,可以看出来初始化操作的主要逻辑有:
    1、设置NioServerSocketChannel的options参数以及attrs参数。
    2、准备NioSocketChannel的各种参数配置,包括eventLoop线程池、事件handler、options参数以及attrs参数,以备ServerBootstrapAcceptor之用。
    3、向NioServerSocketChannel实例的pipeline中添加一个ChannelInitializer实例。ChannelInitializer的主要作用就是负责channel被注册到eventLoop后的初始化操作。

    这里重点看一下pipeline的addLast()操作,该方法在DefaultChannelPipeline实现:

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        for (ChannelHandler h: handlers) {
            addLast(executor, null, h);
        }
        return this;
    }
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1、
            checkMultiplicity(handler);
            //2、
            newCtx = newContext(group, filterName(name, handler), handler);
            //3、
            addLast0(newCtx);
            //4、
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            //下面的逻辑都不会执行
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    

    主要的执行逻辑就在上面这个addLast方法里,可以看一下该方法的主要逻辑,这里被添加进pipeline的handler就是上面的ChannelInitializer实例:
    1、首先检查要添加的handler的重复性。如果该handler不是Sharable的并且已经被添加到其他的pipeline,就抛异常。
    2、根据所给的eventLoopGroup、handler的name以及handler创建一个ChannelHandlerContext实例。
    3、将新创建的context实例添加到pipeline的双向链表中。
    4、此时registeredfalse,也就是说pipeline对应的channel还没有被注册到eventLoop中,那么就设置该context的状态为ADD_PENDING,同时将该context封装成一个PendingHandlerAddedTask实例,将该实例添加到pipeline的一个专门的链表中,以备在channel被注册到eventLoop后执行该实例的调用。简单地看一下具体实现:

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
    

    pendingHandlerCallbackHead就是pipeline的一个专门的链表,用来维护需要执行的PendingHandlerAddedTask实例任务。
    以上的逻辑就是init初始化的一个具体操作。

    ======================register()=============================

    接下来是register逻辑,首先回顾一下initAndRegister方法:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        channel = channelFactory.newChannel();
        init(channel);
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
    

    register的调用链(config().group()返回的就是bossGroup):
    MultithreadEventLoopGroup.register(Channel channel)
    -> SingleThreadEventLoop.register(Channel channel)
    -> SingleThreadEventLoop.register(final ChannelPromise promise)
    -> AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)
    调用的代码逻辑如下所示:

    public ChannelFuture register(Channel channel) {
        //在EventLoopGroup中选择一个EventLoop线程进行注册操作。
        return next().register(channel);
    }
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    public ChannelFuture register(final ChannelPromise promise) {
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    
    //AbstractUnsafe类
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        //前面是检查操作,不贴代码了,
        //如果eventLoop == null,抛出异常
        //如果该Channel已经注册过,ChannelPromise设置失败
        //如果该eventLoop不是NioEventLoop,ChannelPromise设置失败
        AbstractChannel.this.eventLoop = eventLoop;
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                //异常处理
            }
        }
    }
    

    register的主要操作逻辑如下:
    1、将channel注册到所选择的eventLoop上,执行到这,channel就算找到了自己的归属。之后在该channel的整个生命周期内,所有的事件执行操作都由eventLoop负责。
    2、判断当前执行线程是不是eventLoop所属的线程
    如果是,直接执行register0操作
    否则,在eventLoop的线程中执行register0
    3、如果你自己调试的话,此时你会发现eventLoop里面的线程属性还为空,所以当前执行线程肯定不是eventLoop中的线程。那么eventLoop中的线程什么时候创建呢,别急,往下看。

    紧接着会执行eventLoop.execute()SingleThreadEventExecutor类中):

    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            **startThread();**
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    

    当前执行线程还是主线程,虽然进入到eventLoop的执行逻辑里面,但是线程还没有切换(也没有线程可以切换,因为此时eventLoop的线程还没有被创建)。所以紧接着会执行下面的逻辑。

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
               //当前线程就是新创建的线程,分配给eventLoop:eventloop.thread=Thread.currentThread();
                thread = Thread.currentThread();   
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //非关键代码不贴
                }
            }
        });
    }
    

    上面代码中executor的类型是ThreadPerTaskExecutor,其execute()方法的逻辑就是每来一个任务,创建一个新线程执行,所以上面的new Runnable(...)任务会在新创建的线程中执行,而这个新创建的线程就被分配给了eventLoop。

    //ThreadPerTaskExecutor
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
    

    到现在为止,eventLoop中的新线程已经开始执行任务了,什么任务?那就是NioEventLoop中的run方法:

    SingleThreadEventExecutor.this.run();
    

    回到eventLoop中的execute代码中来,现在startThread方法已经执行完,新线程已经创建,正在运行当中,接下来就是向eventLoop所属的队列中添加任务。代码比较简单就不贴了。

    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    

    添加了一个什么任务呢?就是执行register0的任务。

    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
    

    任务添加到队列之后,eventLoop线程就去队列中取任务,然后执行。所以register0是在eventLoop线程中执行的。

    下面是eventLoop现成的调用栈,可以沿着调用栈查看eventLoop线程的执行逻辑,这里就不详述了:


    image.png

    =========================register0==========================

    以下逻辑都是在eventLoop线程中执行的:

    private void register0(ChannelPromise promise) {
        try {
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See [https://github.com/netty/netty/issues/4805](https://github.com/netty/netty/issues/4805)
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
        }
    }
    

    从上面的代码可以看出,主要的逻辑有:
    1、执行doRegister操作,将channel注册到selector上。
    2、设置已注册标志:registered=true
    3、回调handlerAdded方法
    4、回调ChannelRegistered方法

    这里重点看一下3和4:

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            callHandlerAddedForAllHandlers();
        }
    }
    
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            this.pendingHandlerCallbackHead = null;
        }
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }
    

    HandlerAdded方法确保是在eventLoop线程中执行的。可以看到callHandlerAddedForAllHandlers的方法比较简单:首先拿到PendingHandlerCallBack的链表头,依次执行链表中的每一个任务。PendingHandlerCallBack实现了Runnable接口,可以被线程调用。

    private abstract static class PendingHandlerCallback implements Runnable
    

    下面是PendingHandlerAddedTask实现的execute方法:

    void execute() {
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            try {
                //当前线程不是eventLoop线程,调用eventLoop线程执行this任务,
                //最终还是执行callHandlerAdded0(ctx)方法,看下面实现的run方法。    
                executor.execute(this);
            } catch (RejectedExecutionException e) {
                remove0(ctx);
                ctx.setRemoved();
            }
        }
    }
    
    @Override
    public void run() {
        callHandlerAdded0(ctx);
    }
    

    eventLoop线程直接执行callHandlerAdded0(ctx)方法。

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
            //异常处理,将handler从pipeline中移除掉,并调用handler的handlerRemoved方法。
        }
    }
    

    这里面,ctx.handler()返回的就是之前init过程中的ChannelInitializer实例。所以这里调用的是ChannelInitializer里面的handlerAdded方法。

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }
    

    initChannel主要做了这几件事:
    1、初始化channel,这里面的initChannel方法就是在init里面实现的。
    2、出现异常的话进行异常捕获
    3、最终,将ChannelInitializer的实例从pipeline中移除。

    p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    

    initChannel的逻辑就是:
    1、向pipeline中添加用户在ServerBootStrap中指定的handler。
    2、调用eventLoop线程执行pipeline的addLast逻辑,向pipeline中添加一个ServerBootstrapAcceptor实例,该实例的作用就是当有连接到来,创建了新channel之后,对该channel进行初始化。

    这里由于当前线程就是eventLoop线程,所以execute执行逻辑就是将该任务添加到队列中,等该线程执行完当前任务后再从队列中取任务执行:

    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            //非关键代码不贴
        }
        //非关键代码不贴
    }
    

    以上的逻辑就是channel注册到eventLoop之后回调的handlerAdded方法。一句话,就是执行ChannelInitializerinitChannel方法,对channel进行初始化,添加额外的handler。

    接下来将回调channelRegistered方法。

    pipeline.fireChannelRegistered();
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    

    回调完成之后,整个register0的逻辑计算大体执行完了。

    整个register0方法就是在eventLoop中执行的,执行完之后,eventLoop线程继续进行无限for循环:如果队列中有任务,就取任务执行,否则进行selector.select操作,具体逻辑可以到NioEventLooprun方法去查看。整个eventLoop线程的大体执行逻辑就是这样。

    ===========================================================

    然后我们回到主线程:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
        }
    
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
    

    整个register逻辑就算执行完了,返回regFuture

    紧接着就是执行doBind0操作

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            //非关键代码不贴
        }
    }
    

    可以看到绑定地址的操作也是在eventLoop线程中执行的,这里execute就是提交任务到队列,由eventLoop线程去队列取任务执行。

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    执行完之后,返回了一个ChannelPromise实例,然后调用其同步方法sync

    ChannelFuture future = bootstrap.bind(port).sync();
    

    此时主线程已经被阻塞住,查看线程状态:

    image.png

    主线程阻塞,eventLoop线程一直loop。至此,整个服务端就算已经启动完毕了。

    相关文章

      网友评论

          本文标题:Netty服务端启动过程(ServeBootstrap)

          本文链接:https://www.haomeiwen.com/subject/nlaagqtx.html