美文网首页javaJava 杂谈程序员
第十六节 netty源码分析之 server端的源码分析

第十六节 netty源码分析之 server端的源码分析

作者: 勃列日涅夫 | 来源:发表于2019-01-30 15:15 被阅读3次

    netty server端

    以netty官方EchoServer服务器端的启动代码分析:

    public final class EchoServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);//用于处理客户端的连接请求
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于处理与各个客户端连接的 IO 操作
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                        //负责处理客户端的连接请求
                 .handler(new LoggingHandler(LogLevel.INFO))
                        //负责和客户端的连接的 IO 交互
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    

    和客户端的代码相比, 没有很大的差别, 基本上也是进行了如下几个部分的初始化:

        EventLoopGroup: 不论是服务器端还是客户端, 都必须指定 EventLoopGroup. 在这个例子中, 指定了 NioEventLoopGroup, 表示一个 NIO 的EventLoopGroup, 不过服务器端需要指定两个 EventLoopGroup, 一个是 bossGroup, 用于处理客户端的连接请求; 另一个是 workerGroup, 用于处理与各个客户端连接的 IO 操作.
    
        ChannelType: 指定 Channel 的类型. 因为是服务器端, 因此使用了 NioServerSocketChannel.
    
        Handler: 设置数据的处理器.
    
    1. Channel 的初始化

    group.channel(NioServerSocketChannel.class)
    根据源码以及在分析客户端源码很容易看出来服务端channel的初始化

     public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    

    这是会将ServerBootstrap属性ChannelFactory初始化ReflectiveChannelFactory且clazz为NioServerSocketChannel
    至于类型也即NioServerSocketChannel
    NioServerSocketChannel的类图如下:


    图片.png

    unsafe 字段其实是一个 AbstractNioMessageChannel#AbstractNioUnsafe 的实例.
    我们来总结一下, 在 NioServerSocketChannsl 实例化过程中, 所需要做的工作:

    调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel
    
    AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:
    
        parent 属性置为 null
    
        unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类
    
        pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.
    
    AbstractNioChannel 中的属性:
    
        SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.
    
        readInterestOp 被设置为 SelectionKey.OP_ACCEPT
    
        SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
    
    NioServerSocketChannel 中的属性:
    
        ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
    
    1. ChannelPipeline 初始化和客户端一致,在创建channel时会创建pipeline

    2. bossGroup 与 workerGroup

    这里的bossGroup和workerGroup。就是前面介绍nio reactor模式的多线程版本 可参考


    图片.png

    bossGroup 不断地监听是否有客户端的连接, 当发现有一个新的客户端连接到来时, bossGroup 就会为此连接初始化各项资源, 然后从 workerGroup 中选出一个 EventLoop 绑定到此客户端连接中. 那么接下来的服务器与客户端的交互过程就全部在此分配的 EventLoop 中

    源码:
    1、 初始化EventLoopGroup

    /**
         * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
         * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
         * {@link Channel}'s.
         */
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //和客户端的相同
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }
    

    2、 group是如何和channel关联

    • 通过客户端分析可知在connect,而服务端为bind方法绑定

    AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
    源码:

    //  实例化channel和 channel 的注册过程
      final ChannelFuture initAndRegister() {
          Channel channel = null;
          try {
              channel = channelFactory.newChannel();
              init(channel);
          } catch (Throwable t) {
              if (channel != null) {
                  // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                  channel.unsafe().closeForcibly();
                  // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                  return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
              }
              // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
              return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
          }
    //1、config()方法为子类Bootstrap实现返回BootstrapConfig对象。
    // 2、BootstrapConfig中group()其实调用的为其父类 AbstractBootstrapConfig中的group()方法,该方法中返回bootstrap.group()即bootstrap中我们添加的group
    //3、也就是最终调用NioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法。该方法返回 next().register(channel);
    // 4、 next()方法MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup实现的。该方法返回  chooser.next(); 这里的
    //这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回;可参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);)
    //我们还记得最初NioEventLoopGroup的构造器最终会调用MultithreadEventExecutorGroup的构造器MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
    // 在构造器中有一个children[i] = newChild(executor, args);方法格外引起我们的注意。因为这里的前面的next方法返回children数组中的值,newChild方法的实现类在NioEventLoopGroup中
    //它返回return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    //            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);最后NioEventLoop的父类SingleThreadEventLoop 父类中的注册方法register,该方法调用到第四步
    // 5、promise.channel().unsafe().register(this, promise);获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register.
    //6、在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel。AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
    //7、AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
          ChannelFuture regFuture = config().group().register(channel);
          if (regFuture.cause() != null) {
              if (channel.isRegistered()) {
                  channel.close();
              } else {
                  channel.unsafe().closeForcibly();
              }
          }
    
          // If we are here and the promise is not failed, it's one of the following cases:
          // 1) If we attempted registration from the event loop, the registration has been completed at this point.
          //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
          // 2) If we attempted registration from the other thread, the registration request has been successfully
          //    added to the event loop's task queue for later execution.
          //    i.e. It's safe to attempt bind() or connect() now:
          //         because bind() or connect() will be executed *after* the scheduled registration task is executed
          //         because register(), bind(), and connect() are all bound to the same thread.
    
          return regFuture;
      }
    }
    

    这里 group() 方法返回的是上面 bossGroup, 至于channel 是一个 NioServerSocketChannsl 实例, 因此我们可以知道, group().register(channel) 将 bossGroup 和 NioServerSocketChannsl 关联.
    剩下的workerGroup 是在哪里与 NioSocketChannel 关联的呢?
    我们继续看 init(channel) 方法,在子类ServerBootstrap中实现:

    //这里初始化的为nioserverchannel
        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    //currentChildGroup、currentChildHandler客户端的连接的 IO 交互
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    上面代码中init 方法在 ServerBootstrap 中重写了, 从上面的代码片段中我们看到, 它为 pipeline 中添加了一个 ChannelInitializer, 而这个 ChannelInitializer 中添加了一个关键的 ServerBootstrapAcceptor handler.
    关于 handler 的添加与初始化的过程, 我们留待下一小节中分析, 我们现在关注一下 ServerBootstrapAcceptor 类.ServerBootstrapAcceptor 中重写了 channelRead 方法, 其主要代码

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        ...
        childGroup.register(child).addListener(...);
    }
    

    ServerBootstrapAcceptor 中的 childGroup 是构造此对象是传入的 currentChildGroup, 即我们的 workerGroup,
    而 Channel 是一个 NioSocketChannel 的实例, 因此这里的 childGroup.register 就是将 workerGroup 中的摸个 EventLoop 和 NioSocketChannel 关联了.
    , ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时,Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        ... 省略异常处理
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    

    在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
    接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦

    简单用一个图展示下:

    图片.png

    handler 的添加
    服务器端的 handler 的添加过程和客户端的有点区别
    EventLoopGroup 一样, 服务器端的 handler 也有两个, 一个是通过 handler() 方法设置 handler 字段, 另一个是通过 childHandler() 设置 childHandler 字段
    分析上面ServerBootstrap 重写了 init 方法
    pipeline中由

    head<->ChannelInitializer<->tail
    在客户端分析ChannelInitializer,当channel绑定到eventLoop后(在这里是 NioServerSocketChannel 绑定到 bossGroup)中时, 会在pipeline中发出fireChannelRegistered 事件, 接着就会触发 ChannelInitializer.initChannel 方法的调用.
    然后变成
    head<->LoggingHandler<->ServerBootstrapAcceptor<->tail

    上面是server的handler,那么childHandler 是在哪里添加的呢?还记得ServerBootstrapAcceptor这个链接的handler,很明显这里是和客户端交互的地方。

    先说结论,根据源码在ServerBootstrapAcceptor中channelRead方法将我们childhandler添加。
    那么就带来两个问题。1 child channle是如何从channelRead获取。2、channelRead是何时调用
    不过这两个问题通过源码可以一并解决:

    首先是知道从那里来。ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter,作为inbound的handler,并且重写channelRead
    那么当客户端发送数据到客户端时,对于服务端就是接收读取的io事件,那么就会执行channelRead这个方法(这里就很清楚io一般时阻塞,所以在channelRead
    方法中childHandler一般是我们自己实现的io操作handler,将这个handler绑定childChannel上处理,且处理他的线程为childGroup)这么做的用途很明显,

     //inbound事件到来时,这里就是客户端IO
            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
                try {
                 //将操作io的handler绑定到childGroup,执行完成后断开childchannel
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    
    • 总结: 从上面分析服务端有两个pipeline。ServerBootstrapAcceptor只负责接受客户端的connect也就是服务端的accept事件(所以上面方法的入参msg为客户端的channel)
      链接后将后续的io相关的childHandler 绑定到这个childChannel上且注册childGroup
    图片.png

    相关文章

      网友评论

        本文标题:第十六节 netty源码分析之 server端的源码分析

        本文链接:https://www.haomeiwen.com/subject/zdptdqtx.html