美文网首页
netty系列-D1部分 netty服务端启动

netty系列-D1部分 netty服务端启动

作者: 青changing | 来源:发表于2019-04-22 00:58 被阅读0次

    0、前言

    netty作为现在常用的NIO框架,以其强大的健壮性、性能、可定制性、可扩展性在同类框架中首屈一指,大部分常见中间件如果用到了远程通信,大多选择于此。
    个人也对其非常感兴趣,遂打算写一个系列,来从源码上聊聊netty。本文是第一篇netty的源码文章,先从netty服务端的启动来聊吧,其中也会对照下jdk nio的知识。

    1、相关案例

    在写之前,先写明下netty的server启动demo样例、jdk nio的server启动demo样例,帮助大家进行下回顾,也方便大家理解netty如何做的封装。

    1.1、jdk nio 样例

    这是比较基础的jdk nio server端启动的相关配置

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    int port = 5566;
    serverChannel.socket().setSoTimeout(3000);      
    serverChannel.socket().bind(new InetSocketAddress(port),128);
    Selector selector = Selector.open();
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    while(true){
        int n = selector.select();
        if(n > 0) {
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey selectionKey = iter.next();
                ......
                iter.remove();
            }
        }
    }
    

    1.2、netty样例

    // 配置服务端的NIO线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(
                        MarshallingCodeCFactory
                            .buildMarshallingDecoder());
                    ch.pipeline().addLast(
                        MarshallingCodeCFactory
                            .buildMarshallingEncoder());
                    ch.pipeline().addLast(new SubReqServerHandler());
                }
            });
        // 绑定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();
        
        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放线程池资源
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
    

    2、netty启动流程

    关于NioEventLoopGroup的内容放到下个章节来讲解,本文先讲解下server端启动,可以暂时将NioEventLoopGroup理解为一组线程,管理eventLoop的生命周期,在执行的时候相关操作时,从其中选取一个eventLoop来进行执行。

    AbstractBootstrap#bind()[用户代码入口]
        AbstractBootstrap#doBind
            AbstractBootstrap#initAndRegister()[初始化并注册]
    1. 创建服务端Channel
                ChannelFactory#newChannel()[创建服务端channel][通过反射创建服务端channel]
                  NioServerSocketChannel#DEFAULT_SELECTOR_PROVIDER[通过SelectorProvider#provider()生成]
                    NioServerSocketChannel()[NioServerSocketChannel构造方法]
                       NioServerSocketChannel#newSocket()[通过jdk来创建底层jdk channel,此处等同于1.1中的ServerSocketChannel.open()]
                       AbstractChannel(Channel)[AbstractChannel的构造方法,创建id(DefaultChannelId),unsafe(不同Channel不同),pipeline(DefaultChannelPipeline)]
                       AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)[AbstractNioChannel的构造方法,设置ch、readInterestOp属性,并设置为非阻塞模式]
                           ch.configureBlocking(false);[设置channel为非阻塞模式]
                       AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp)[AbstractNioMessageChannel构造方法,仅做透传]
                       super(null, channel, Selectioney.OP_ACCEPT)[此处可以看到传入的channel为新创建的serverSocketChannel,readInterestOp为OP_ACCEPT状态]
                       config = new NioServerSocketChannelConfig(this, javaChannel().socket());[初始化config,tcp参数配置类]
                           javaChannel().socket()[javaChannel()返回的是jdk原生的channel对象,即上面创建的NioServerSocketChannel对象,等同于1.1中的serverChannel.socket()]
                           DefaultChannelConfig(Channel)[DefaultChannelConfig的构造方法]
                               this(channel, new AdaptiveRecvByteBufAllocator());[AdaptiveRecvByteBufAllocator用于构建一个最优大小的缓冲区来接收数据。该缓存的容量会尽可能的足够大以读入所有的入站数据,并且该缓存的容量也尽可能的小以不会浪费它的空间。]
                               setRecvByteBufAllocator(allocator, channel.metadata());[对allocator的maxMessagesPerRead字段赋值,默认16(设置一个读循环可读取的最大消息个数);校验allocator不同为null]
                           DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket)[DefaultServerSocketChannelConfig构造方法,设置javaSocket]
    2. 初始化服务端Channel
                  AbstractBootstrap#init(channel);[初始化channel]
                    AbstractBootstrap#options0();[取到用户自定义的options,此处的option就是1.2的option方法填入]
                    AbstractBootstrap#setChannelOptions(channel, options, logger);[设置socket、channelConfig的值]
                    AbstractBootstrap#attrs0();[取到用户自定义的attrs]
                    channel.attr(key).set(e.getValue());[将自定的value填入channel的AttributeMap中]
                    channel.pipeline();[获取channelPipeline,默认为DefaultChannelPipeline]
                    childGroup、childHandler、childOptions、childAttrs进行取值操作
                     pipeline.addLast(new ChannelInitializer<Channel>() {...}[往pipeline中添加处理器]
                        handler=config.handler();[获取handler,此处的handler为主handler]
                        pipeline.addLast(handler); [将handler添加到pipeline末尾] 
                        ch.eventLoop().execute(new Runnable(){...})[通过eventLoop来执行runnable,这里的eventLoop其实是在下面的register才注册上的]
                            pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));[runnable里的内容,生成一个ServerBootstrapAcceptor,并传入child的信息。最后放到pipeline的尾部,关于ServerBootstrapAcceptor的详细描述看下面]
                                DefaultChannelPipeline#addLast(executor, null, h); // 将handler添加到pipeline的尾部
                                    newCtx = newContext(group, filterName(name, handler), handler); // 创建DefaultChannelHandlerContext实例
                                    addLast0(newCtx); // 添加操作,将newCtx添加到pipeline的尾部
                                    newCtx.setAddPending(); // 设置handlerState为ADD_PENDING
                                    callHandlerCallbackLater(newCtx, true); // 由于此时还未注册,所以先设置pendingHandlerCallbackHead,后续调用
                                        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); // 生成一个PendingHandlerAddedTask任务
                                        pendingHandlerCallbackHead = task; // 赋值给pendingHandlerCallbackHead
    3. 注册selector
                  config().group().register(channel)[将Channel与eventLoopGroup进行绑定]
                    config().group()[获取group属性,即eventLoopGroup]
                    EventLoopGroup#register(channel)[将channel与一个EventLoop绑定]
                        MultithreadEventLoopGroup#register
                            chooser.next().register(channel);[通过chooser选出一个EventLoop]
                            SingleThreadEventLoop#register
                                register(new DefaultChannelPromise(channel, this));[DefaultChannelPromise是一个可写的future]
                                promise.channel().unsafe().register(this, promise);
                                AbstractChannel.AbstractUnsafe#register(EventLoop,ChannelPromise)
                                    AbstractChannel.this.eventLoop = eventLoop;[将eventLoop赋值]
                                    AbstractUnsafe#register0(promise);[此处是在eventLoop中执行,相当于异步执行]
                                        AbstractUnsafe#doRegister();[进行实际注册]
                                            AbstractNioChannel#doRegister[进行实际的注册操作]
                                                javaChannel().register(eventLoop().unwrappedSelector(), 0, this);[将channel注册到selector上,返回SelectionKey,类似于1.1中的serverChannel.register(selector, SelectionKey.OP_ACCEPT);只不过此处的ops为0(即什么事件都不感兴趣),那么此处主要是将netty中的channel绑定到SelectionKey上]
                                        DefaultPipeline#invokeHandlerAddedIfNeeded();[在设置promise成功之前,调用了handlerAdded(...)方法]
                                            DefaultChannelPipeline#callHandlerAddedForAllHandlers[会执行PendingHandlerAddedTask(ChannelInitializer为PendingHandlerAddedTask的子类)的handlerAdded(..)、PendingHandlerRemovedTask的handlerRemoved(..)方法]
                                        AbstractChannel#safeSetSuccess(promise);[将promise设置为完成状态]
                                        DefaultPipeline#fireChannelRegistered();[执行channelRegistered事件]
                                        if(isActive()){[判断jdk中的ServerSocket是否已经绑定;ServerSocket#isBound]
                                            if (firstRegistration) { [第一次注册]
                                                DefaultPipeline#fireChannelActive();[active状态下,只有第一次注册时,才会执行;避免channel多次注销与注册导致触发多次]                                     
                                            } else if (config().isAutoRead()) { [是否自动读,默认值为true]
                                                beginRead();[非第一次注册,且设置了自动读,则我们应该去读,以便处理读入的数据]
                                            }
                                        }
    4. 端口绑定
            doBind0(regFuture, channel, localAddress, promise);[进行绑定处理,这里可能有些同学有疑问:注册有可能是异步的,那么这里进行端口绑定是否注册完成了呢?请看下面的解释] 
                channel.eventLoop().execute(new Runnable() {...bind操作...})[在eventLoop中执行bind操作]
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);[runnable内的内容,进行bind操作;并添加了失败的监听处理器,在失败时关闭channel]
                        AbstractChannel#pipeline.bind(localAddress);[调用DefaultChannelPipeline的bind操作,关于pipeline的相关内容,另开一篇说明]
                            tail.bind(localAddress);[从tail进行bind操作]
                                AbstractChannelHandlerContext#bind(localAddress, newPromise());[进行绑定操作,生成一个DefaultChannelPromise进行传递]
                                    next = findContextOutbound();[找下一个outbound]
                                    next.invokeBind(localAddress, promise);[下一个执行invokeBind()方法]
                                        HeadContext#bind()[最终从tail找到了head]
                                            unsafe.bind(localAddress, promise);[通过AbstractChannel.AbstractUnsafe的bind方法进行绑定]
                                                NioServerSocketChannel#doBind(localAddress);[进行真正的绑定处理了]
                                                    javaChannel().socket().bind(localAddress, config.getBacklog());[调用了jdk的绑定操作,等同于1.1中的jdk绑定操作serverChannel.socket().bind(new InetSocketAddress(port),128);]
                                                pipeline.fireChannelActive();[当doBind()之前为非active,bind()之后为active,则调用]
                                                    AbstractChannelHandlerContext.invokeChannelActive(head);[触发channelActive事件调用]
                                                        next.invokeChannelActive();
                                                            DefaultChannelPipeline.HeadContext#channelActive[会执行到该方法]
                                                                DefaultChannelPipeline.HeadContext#readIfIsAutoRead[如果是自动读,则进行读取操作,默认是]
                                                                    AbstractChannel#read();[进行读取操作]
                                                                        DefaultChannelPipeline#read
                                                                        AbstractChannelHandlerContext#read
                                                                            next = findContextOutbound();
                                                                            next.invokeRead();
                                                                                DefaultChannelPipeline.HeadContext#read[最终调用到HeadContext的read]
                                                                                    AbstractChannel.AbstractUnsafe#beginRead[开始进行读取操作]
                                                                                        AbstractNioChannel#doBeginRead[真正的进行读取操作]
                                                                                            selectionKey.interestOps(interestOps | readInterestOp);[如果对readInterestOp操作不感兴趣,则在这里设置上,此处的readInterestOp是上面 1.创建服务端 Channel中设置的OP_ACCEPT,所以此处才设置上;如同1.1中的serverChannel.register(selector, SelectionKey.OP_ACCEPT);可以读一个新的连接了]                                                                                                                                                        
                                                safeSetSuccess(promise);[将promise置为成功状态]
    

    2.1、ChannelFactory是何时设置的?

    在ServerBootstrap#channel(Class<? extends C> channelClass)方法执行时,会将传入的channelClass进行包装,生成new ReflectiveChannelFactory(channelClass),并赋值给channelFactory变量。

    ReflectiveChannelFactory只是一个简单的封装类,newChannel()是直接通过反射调用传入的channelClass的无参构造方法初始化。

    2.2、SelectorProvider.provider()有什么用?

    JDK的NIO中的SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现。

    ServerSocketChannel.open();

    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
    

    SocketChannel.open();

    public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }
    

    Selector.open();

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    

    2.3、 SelectorProvider.provider()具体做了什么?

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<>() {
                    public SelectorProvider run() {
                    if (loadProviderFromProperty())
                        return provider;
                    if (loadProviderAsService())
                        return provider;
                    provider = sun.nio.ch.DefaultSelectorProvider.create();
                    return provider;
                }
            });
        }
    }
    
    1. 如果配置了“java.nio.channels.spi.SelectorProvider”属性,则通过该属性值load对应的SelectorProvider对象,如果构建失败则抛异常。
    2. 如果provider类已经安装在了对系统类加载程序可见的jar包中,并且该jar包的源码目录META-INF/services包含有一个java.nio.channels.spi.SelectorProvider提供类配置文件,则取文件中第一个类名进行load以构建对应的SelectorProvider对象,如果构建失败则抛异常。
    3. 如果上面两种情况都不存在,则返回系统默认的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
    4. 随后在调用该方法,即SelectorProvider.provider()。则返回第一次调用的结果。
    5. 不同系统对应着不同的sun.nio.ch.DefaultSelectorProvider,主要是其create()方法返回的SelectorProvider不同。
        - linux
            sun.nio.ch.EPollSelectorProvider
        - mac
            sun.nio.ch.KQueueSelectorProvider
        - windows
            sun.nio.ch.WindowsSelectorProvider
    

    2.4、ServerBootstrapAcceptor作用

    此图摘自Doug Lee大神的主从Reactor多线程模型:


    image.png

    ServerBootstrapAcceptor如图中的acceptor,是一个入站处理器,其主要是完成mainReactor和subReactor的转交操作,将mainReactor accept的客户端channel,注册到subReactor上进行处理,从而实现主从Reactor模型的处理。

    其内部的具体执行逻辑在后续accept处理的逻辑中会详细讲到。

    2.5、注册有可能是异步的,那么这里进行端口绑定是否注册完成了呢?

    在AbstractBootstrap#initAndRegister()的方法中返回ChannelFuture,此时并不等同于已经完成注册。
    但是,如果此处成功的返回ChannelFuture,且cause为null。
    则有如下两种可能:

    1. 我们是在eventLoop中调用注册的,那么上面的逻辑就相当于串行的,则这个注册已经完成。
      所以,此时进行bind()或connect()都是安全的。
    2. 我们是从其他线程中来调用注册的,那么注册行为会被作为一个任务添加到eventLoop的task队列中进行排队执行。
      这个时候执行bind()或connect()也是安全的,因为bind()或connect()也会被作为任务放入到eventLoop的task队列中,队列顺序执行。

    3、结语

    本章节主要讲解netty server端启动的相关处理逻辑,其中如有错误,欢迎大家留言沟通。
    本文中提到了很多eventLoop相关的内容,下章节来看下NioEventLoop的启动,及其所完成的使命。

    相关文章

      网友评论

          本文标题:netty系列-D1部分 netty服务端启动

          本文链接:https://www.haomeiwen.com/subject/pfqegqtx.html