Netty学习 - Bootstrap引导

作者: buzzerrookie | 来源:发表于2018-07-19 15:08 被阅读10次

    引导

    在Netty中,有两种引导,一是Bootstrap,用于引导客户端或者无连接服务器;另一种便是ServerBootstrap,用于引导面向连接的服务器。Bootstrap整个类层次如下图所示,本文将依次分析AbstractBootstrap、Bootstrap和ServerBootstrap。


    Bootstrap类层次.png

    AbstractBootstrap类

    AbstractBootstrap类的javadoc说明如下:

    AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. It support method-chaining to provide an easy way to configure the AbstractBootstrap.
    When not used in a ServerBootstrap context, the bind() methods are useful for connectionless transports such as datagram (UDP).

    上述文字表明:AbstractBootstrap类支持方法的链式调用,当不使用ServerBootstrap时,bind方法可以用于无连接的协议如UDP等,这与日常用法相一致。

    成员变量和构造函数

    volatile EventLoopGroup group;
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;
    
    AbstractBootstrap() {
        // Disallow extending from a different package.
    }
    
    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }
    

    以上代码有以下几点需要注意:

    • 构造函数中直接访问了另一个实例的私有变量,这种是被允许的,因为访问控制是控制其他类是否能够访问本类的变量或者方法,具体可参阅Controlling Access to Members of a Class
    • 为什么要有两个synchronized代码块呢?因为putAll方法的javadoc表明如果源map在被put到目的map的过程中被修改,这个过程是未定义的,具体可参阅官方文档

    成员方法

    AbstractBootstrap类是一种构建者模式(Builder)

    • group方法设置了成员变量group;
    • channel/channelFactory方法设置了成员变量channelFactory,用来创建Channel。区别在于channelFactory方法直接指定了工厂,channel则是利用类参数创建了ReflectiveChannelFactory实例然后接着调用了channelFactory方法。channelFactory方法的javadoc提到:

      This method is usually only used if channel(Class) is not working for you because of some more complex needs. If your Channel implementation has a no-args constructor, its highly recommend to just use channel(Class) to simplify your code

    • localAddress方法设置了成员变量localAddress,这个方法有几种重载的形式;
    • option方法和attr方法相似,都可以使用null移除键;
    • validate方法验证group和channelFactory均不为null,这个方法会在bind方法中被调用。注意子类可以重写该方法,但必须调用基类的方法,后续会看到Bootstrap和ServerBootstrap都重写了该方法,加入了自己额外的验证。《Netty实战》8.2.2节有以下描述,这正是父类和子类validate方法的作用;

      在引导的过程中,在调用bind()或者connect()方法之前,必须调用以下方法来设置所需的组件:
      group();
      channel()或者channelFactory();
      handler().
      如果不这样做,则将会导致IllegalStateException。对handler()方法的调用尤其重要,因为它需要配置好ChannelPipeline。

    • bind方法比较复杂,下面详细分析一下。

    bind方法

    bind方法会在内部调用doBind方法,首先会调用initAndRegister方法初始化并注册通道,接下来按注册是否结束分情况讨论,都是交由doBind0方法处理。

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    initAndRegister方法

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
    
    abstract void init(Channel channel) throws Exception;
    

    initAndRegister方法是一个模板方法

    1. 利用channelFactory新建通道,以前述的ReflectiveChannelFactory为例,其newChannel方法会根据传入的Channel类型调用对应的无参构造函数返回新建的通道;
      public ReflectiveChannelFactory(Class<? extends T> clazz) {
          if (clazz == null) {
              throw new NullPointerException("clazz");
          }
          this.clazz = clazz;
      }
      
      @Override
      public T newChannel() {
          try {
              return clazz.getConstructor().newInstance();
          } catch (Throwable t) {
              throw new ChannelException("Unable to create Channel from class " + clazz, t);
          }
      }
      
    2. 调用抽象的init方法初始化新建的通道,子类需要重写该方法;
    3. 将新建的通道注册到与该引导类关联的EventLoopGroup上。

    doBind0方法

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    doBind0方法使得在Channel绑定的EventLoop上执行具体的通道绑定操作。注意从doBind0被调用的位置可以看到其一定是在注册操作完成之后被调用:

    • doBind方法中if (regFuture.isDone()) 代码块内,这时注册已经完成,不用去管是否成功,因为doBind0内部会判断;
    • doBind方法中的else代码块与上面类似。

    Bootstrap类

    Bootstrap类继承了AbstractBootstrap类,新增加了remoteAddress和resolver成员变量,与之对应有remoteAddress和resolver成员方法。

    成员方法

    • validate方法:前文提到Bootstrap会重写AbstractBootstrap类的该方法,Bootstrap类除了调用基类的方法,还验证了handler不为null;

    • init方法:为通道添加了配置的处理器,设置了通道的选项和属性;

      void init(Channel channel) throws Exception {
          ChannelPipeline p = channel.pipeline();
          p.addLast(config.handler());
      
          final Map<ChannelOption<?>, Object> options = options0();
          synchronized (options) {
              setChannelOptions(channel, options, logger);
          }
      
          final Map<AttributeKey<?>, Object> attrs = attrs0();
          synchronized (attrs) {
              for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                  channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
              }
          }
      }
      
    • connect方法:connect方法会在内部调用doResolveAndConnect方法完成解析远程域名和连接的工作,整体流程与bind方法很相似,也是先初始化并注册通道,接下来按注册是否结束分情况讨论,委托给了doResolveAndConnect0方法。

      private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
          final ChannelFuture regFuture = initAndRegister();
          final Channel channel = regFuture.channel();
      
          if (regFuture.isDone()) {
              if (!regFuture.isSuccess()) {
                  return regFuture;
              }
              return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
          } else {
              // Registration future is almost always fulfilled already, but just in case it's not.
              final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              regFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                      // failure.
                      Throwable cause = future.cause();
                      if (cause != null) {
                          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                          // IllegalStateException once we try to access the EventLoop of the Channel.
                          promise.setFailure(cause);
                      } else {
                          // Registration was successful, so set the correct executor to use.
                          // See https://github.com/netty/netty/issues/2586
                          promise.registered();
                          doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                      }
                  }
              });
              return promise;
          }
      }
      
      private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                                 final SocketAddress localAddress, final ChannelPromise promise) {
          try {
              final EventLoop eventLoop = channel.eventLoop();
              final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
      
              if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                  // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                  doConnect(remoteAddress, localAddress, promise);
                  return promise;
              }
      
              final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
      
              if (resolveFuture.isDone()) {
                  final Throwable resolveFailureCause = resolveFuture.cause();
      
                  if (resolveFailureCause != null) {
                      // Failed to resolve immediately
                      channel.close();
                      promise.setFailure(resolveFailureCause);
                  } else {
                      // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                      doConnect(resolveFuture.getNow(), localAddress, promise);
                  }
                  return promise;
              }
      
              // Wait until the name resolution is finished.
              resolveFuture.addListener(new FutureListener<SocketAddress>() {
                  @Override
                  public void operationComplete(Future<SocketAddress> future) throws Exception {
                      if (future.cause() != null) {
                          channel.close();
                          promise.setFailure(future.cause());
                      } else {
                          doConnect(future.getNow(), localAddress, promise);
                      }
                  }
              });
          } catch (Throwable cause) {
              promise.tryFailure(cause);
          }
          return promise;
      }
      
      private static void doConnect(
              final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
      
          // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
          // the pipeline in its channelRegistered() implementation.
          final Channel channel = connectPromise.channel();
          channel.eventLoop().execute(new Runnable() {
              @Override
              public void run() {
                  if (localAddress == null) {
                      channel.connect(remoteAddress, connectPromise);
                  } else {
                      channel.connect(remoteAddress, localAddress, connectPromise);
                  }
                  connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
              }
          });
      }
      

    ServerBootstrap类

    ServerBootstrap类继承了AbstractBootstrap类,新增加了childHandler、childGroup、childOptions和childAttrs成员变量,并增加了与之对应的成员方法。

    成员方法

    • validate方法:前文提到ServerBootstrap会重写AbstractBootstrap类的该方法,ServerBootstrap类除了调用基类的方法,还验证了childHandler和childGroup均不为null;
    • init方法:为监听的通道设置了通道的选项和属性,并添加了处理器;
      void init(Channel channel) throws Exception {
          final Map<ChannelOption<?>, Object> options = options0();
          synchronized (options) {
              setChannelOptions(channel, options, logger);
          }
      
          final Map<AttributeKey<?>, Object> attrs = attrs0();
          synchronized (attrs) {
              for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                  @SuppressWarnings("unchecked")
                  AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                  channel.attr(key).set(e.getValue());
              }
          }
      
          ChannelPipeline p = channel.pipeline();
      
          final EventLoopGroup currentChildGroup = childGroup;
          final ChannelHandler currentChildHandler = childHandler;
          final Entry<ChannelOption<?>, Object>[] currentChildOptions;
          final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
          synchronized (childOptions) {
              currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
          }
          synchronized (childAttrs) {
              currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
          }
      
          p.addLast(new ChannelInitializer<Channel>() {
              @Override
              public void initChannel(final Channel ch) throws Exception {
                  final ChannelPipeline pipeline = ch.pipeline();
                  ChannelHandler handler = config.handler();
                  if (handler != null) {
                      pipeline.addLast(handler);
                  }
      
                  ch.eventLoop().execute(new Runnable() {
                      @Override
                      public void run() {
                          pipeline.addLast(new ServerBootstrapAcceptor(
                                  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                      }
                  });
              }
          });
      }
      

    ServerBootstrapAcceptor类

    ServerBootstrapAcceptor类是ServerBootstrap类的私有静态内部类,用于充当Reactor模式中的Acceptor角色,它继承了ChannelInboundHandlerAdapter类:

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;
    
        ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
            // 省略一些代码
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            setChannelOptions(child, childOptions, logger);
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
    
        // 省略一些代码
    }
    

    注意channelRead方法:

    • final Channel child = (Channel) msg; 表明它的入站消息是一个通道;为什么是一个通道呢?请看Netty学习 - EventLoop
    • child.pipeline().addLast(childHandler); 接受连接后才将已连接通道的处理器(即ServerBootstrap的childHandler方法中的参数)添加到已连接通道的流水线上,并设置选项;
    • 将已连接通道注册到childGroup(从Reactor)上。

    ServerBootstrap引导的结果是将ServerSocketChannel注册到group变量(即所谓的bossGroup)表示的EventLoopGroup里的一个EventLoop上,即ServerBootstrapAcceptor只运行于一个EventLoop里。

    相关文章

      网友评论

        本文标题:Netty学习 - Bootstrap引导

        本文链接:https://www.haomeiwen.com/subject/mraupftx.html