美文网首页
netty注册流程分析一

netty注册流程分析一

作者: hello_kd | 来源:发表于2020-06-14 15:13 被阅读0次

    在基于nio的编程中,一般是声明一个ServerSocketChannel对象,然后注册到selector中,监听accept事件,当有新的连接请求时,select方法返回,通过ServerSocketChannel的accept方法获取到新建立的SocketChannel连接,并注册到selector,同时监听read事件,那么netty是如何处理这一过程的呢

    我们先来看下一个netty的常规编程模式,然后再跟踪源码,来了解这个注册监听流程

    //1. 启动器,负责组装netty组件,启动服务器
    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
    new ServerBootstrap()
            .group(bossGroup, workerGroup)
            //选择服务器ServerSocketChannel的实现
            .channel(NioServerSocketChannel.class)
            //4 boss 负责处理连接 worker(child)负责处理读写,决定worker能执行哪些操作
            .childHandler(
                    //5 代表和客户端进行数据读写的通道
                    new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(msg);
                                }
                            });
                        }
            })
            .bind(8899);
    

    这里面除了ServerBootstrap组件没有讲过,其他的在前面的几篇文章都说过了,可以简单的理解为ServerBootstrap就是负责连接各个组件,启动服务器。

    这里直接看下bind方法的逻辑

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    doBind方法,大概可以分为这三个大步骤init、register、doBind0
    其中,init就是初始化一个Channel对象,生成关联的pipeline对象,并且添加一个入站处理器ChannelInitializer,这个入站处理器的handlerAdd方法会在channel注册到selector后才会被调用,handlerAdd方法

    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }
    
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
    

    这里面有一个很重要是,当handlerAdd方法触发时,又会往pipeline添加一个入站处理器ServerBootstrapAcceptor,并且将当前的处理器从pipeline移除。这个处理器就是用来注册新连接的读事件。

    初始化channel对象后,接着就调用register方法,就会从NioEventLoopGroup(boss group)中挑选出一个NioEventLoop对象来注册channel,而NioEventLoop对象对channel的注册逻辑,跟进代码可以看到如下

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(eventLoop, "eventLoop");
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
    
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                //省略....
            }
        }
    }
    

    这里最重要的有一个判断eventLoop.inEventLoop(),这个是判断currentThread是否为eventLoop的执行线程(EventLoop第一次执行任务时会关联到一个thread对象),若是的话,直接执行register0方法;不是的话,将其作为runnable对象提交给eventLoop的任务队列,在事件循环中处理。

    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    

    这个注册方法里面第一步先执行doRegister,这个就是将channel注册到selector上,但此时并没有监控accept事件。然后再调用pipeline上ChannelHandler的handlerAdd方法,这里会执行上面提到的ChannelInitializer的initChannel方法,添加一个ServerBootstrapAcceptor入站处理器。

    接着,调用safeSetSuccess方法,表明这个channel的注册完成,回调promise的监听器,然后调用pipeline上的ChannelHandler的channelRegistered方法。方法后面会判断isActive方法,因为netty中调用register方法是异步的,当方法register返回后,便会紧接着调bind方法,也就是刚刚说的三大步骤的第三部doBind0方法。

    相关文章

      网友评论

          本文标题:netty注册流程分析一

          本文链接:https://www.haomeiwen.com/subject/hoownctx.html