美文网首页
netty Bootstrap

netty Bootstrap

作者: dinel | 来源:发表于2020-04-13 15:09 被阅读0次

1.1 Bootstrap的作用

Bootstrap的作用可以参考AbstractBootstrap的javadoc:

    AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.

Bootstrap存在的意义就是为了方便的"引导"Channel.

在netty中, 存在两种类型的Channel, 因此也对应有两种Bootstrap

channel类型 用于引导的bootstrap实现类
ServerChannel ServerBootstrap
Channel Bootstrap

1.2 Bootstrap的继承结构

在netty的代码中, 类ServerBootstrap和类Bootstrap都继承自基类AbstractBootstrap:


图片.png

2.类AbstractBootstrap

2.1 类定义

AbstractBootstrap是Bootstrap的基类, 类定义如下:

package io.netty.bootstrap;

public abstract class AbstractBootstrap
<B extends AbstractBootstrap<B, C>, C extends Channel> 
implements Cloneable {}

类定义中的泛型B要求是AbstractBootstrap的子类, 而泛型C要求是Channel的子类.
注意这里的泛型的用法,非常的巧妙。

2.2 成员变量

group属性

volatile EventLoopGroup group;

public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return (B) this;
}

public EventLooattrspGroup group() {
    return group;
}

注意this.group只能设置一次, 这意味着group(group)方法只能被调用一次.

localAddress属性

localAddress用于绑定本地终端, 有多个设值的方法:

private volatile SocketAddress localAddress;

public B localAddress(SocketAddress localAddress) {
    this.localAddress = localAddress;
    return (B) this;
}

public B localAddress(int inetPort) {
    return localAddress(new InetSocketAddress(inetPort));
}

public B localAddress(String inetHost, int inetPort) {
    return localAddress(new InetSocketAddress(inetHost, inetPort));
}

public B localAddress(InetAddress inetHost, int inetPort) {
    return localAddress(new InetSocketAddress(inetHost, inetPort));
}

final SocketAddress localAddress() {
    return localAddress;
}

这些重载的localAddress(), 最终都指向了InetSocketAddress的几个构造函数.

options属性

options属性是一个LinkedHashMap, option()方法用于设置单个的key/value, 如果value为null则删除该key.

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

public <T> B option(ChannelOption<T> option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {
            options.put(option, value);
        }
    }
    return (B) this;
}

final Map<ChannelOption<?>, Object> options() {
    return options;
}

attrs属性

attrs和options属性类似.

private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();

public <T> B attr(AttributeKey<T> key, T value) {
    if (key == null) {
        throw new NullPointerException("key");
    }
    if (value == null) {
        synchronized (attrs) {
            attrs.remove(key);
        }
    } else {
        synchronized (attrs) {
            attrs.put(key, value);
        }
    }
    return (B) this;
}

final Map<AttributeKey<?>, Object> attrs() {
    return attrs;
}

handler属性

private volatile ChannelHandler handler;

public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
    return (B) this;
}

final ChannelHandler handler() {
    return handler;
}

channelFactory属性

新旧两个ChannelFactory

channelFactory这个属性有点麻烦, 根源在于ChannelFactory这个类,netty中有新旧两个ChannelFactory,具体介绍见 Channel Factory

混合使用

但是现在的情况是内部已经转为使用新类, 对外的接口还是继续保持使用原来的旧类, 因此代码有些混乱:

// 这里的channelFactory的类型定义用的是旧类,因此需要加SuppressWarnings
@SuppressWarnings("deprecation")
private volatile ChannelFactory< ? extends C> channelFactory;

// 返回的channelFactory也是用的旧类, 没的说, 继续SuppressWarnings
@SuppressWarnings("deprecation")
final ChannelFactory< ? extends C> channelFactory() {
    return channelFactory;
}

// 这个方法的参数是旧的"io.netty.bootstrap.ChannelFactory",已经被标志为"@Deprecated",尽量用下面的方法
@Deprecated
public B channelFactory(ChannelFactory< ? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;
}

// 这个方法是现在推荐使用的设置channelFactory的方法, 使用新类"io.netty.channel.ChannelFactory"
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory< ? extends C> channelFactory) {
    // 但是底层的实现还是调用回上面被废弃的channelFactory()方法
    // 因为新类是继承自旧类的,所有只要简单转一下类型就好
    return channelFactory((ChannelFactory<C>) channelFactory);
}

此外还有一个channel()方法可以非常方便的设置channelFactory:

public B channel(Class< ? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

2.3 类方法

validate()

validate()用于检验所有的参数, 实际代码中检查的是group和channelFactory两个参数, 这两个参数必须设置不能为空:

public B validate() {
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return (B) this;
}

register()

register()方法创建一个新的Channel并将它注册到EventLoop, 在执行前会调用validate()方法做前置检查:

public ChannelFuture register() {
    validate();
    return initAndRegister();
}

initAndRegister()是关键代码, 细细读一下:

final ChannelFuture initAndRegister() {
    // 创建一个新的Channel
    final Channel channel = channelFactory().newChannel();
    try {
        // 调用抽象方法, 子类来做初始化
        init(channel);
    } catch (Throwable t) {
        // 如果出错, 强行关闭这个channel
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 创建成功则将这个channel注册到eventloop中
    ChannelFuture regFuture = group().register(channel);
    // 如果注册出错
    if (regFuture.cause() != null) {
        // 判断是否已经注册
        if (channel.isRegistered()) {
            // channel已经注册的就关闭
            channel.close();
        } else {
            // 还没有注册的就强行关闭
            channel.unsafe().closeForcibly();
        }
    }

    // 如果代码走到这里而且promise没有失败, 那么是下面两种情况之一:
    // 1) 如果我们尝试了从event loop中注册, 那么现在注册已经完成
    //    现在可以安全的尝试 bind()或者connect(), 因为channel已经注册成功
    // 2) 如果我们尝试了从另外一个线程中注册, 注册请求已经成功添加到event loop的任务队列中等待后续执行
    //    现在可以安全的尝试 bind()或者connect():
    //         因为 bind() 或 connect() 会在安排的注册任务之后执行
    //         而register(), bind(), 和 connect() 都被确认是同一个线程

    return regFuture;
}

中途调用的init()方法定义如下, 后面看具体子类代码时再展开.

abstract void init(Channel channel) throws Exception;

bind()

bind()方法有多个重载, 差异只是bind操作所需的InetSocketAddress参数从何而来而已:

  从属性this.localAddress来

  这个时候bind()方法无需参数, 直接使用属性this.localAddress, 当前调用之前this.localAddress必须有赋值(通过函数localAddress()):

 public ChannelFuture bind() {
     validate();
     SocketAddress localAddress = this.localAddress;
     if (localAddress == null) {
         throw new IllegalStateException("localAddress not set");
     }
     return doBind(localAddress);
 }

从bind()方法的输入参数中来

在输入参数中来直接指定localAddress:

 public ChannelFuture bind(SocketAddress localAddress) {
     validate();
     if (localAddress == null) {
         throw new NullPointerException("localAddress");
     }
     return doBind(localAddress);
 }

另外为了方便, 重载了下面三个方法, 用不同的方式来创建InetSocketAddress而已:

public ChannelFuture bind(int inetPort) {
     return bind(new InetSocketAddress(inetPort));
 }

 public ChannelFuture bind(String inetHost, int inetPort) {
     return bind(new InetSocketAddress(inetHost, inetPort));
 }

 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
     return bind(new InetSocketAddress(inetHost, inetPort));
 }

注: 使用带参数的bind()方法, 忽略了localAddress()设定的参数. 而且也没有设置localAddress属性. 这里的差异, 后面留意.

继续看doBind()方法的细节, 这个依然是这个类的核心内容:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 调用initAndRegister()方法, 先初始化channel,并注册到event loop
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 检查注册的channel是否出错
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 检查注册操作是否完成
    if (regFuture.isDone()) {
        // 如果完成
        // 在这个点上我们知道注册已经完成并且成功
        // 继续bind操作, 创建一个ChannelPromise
        ChannelPromise promise = channel.newPromise();
        // 调用doBind0()方法来继续真正的bind操作
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 通常这个时候注册的future应该都已经完成,但是万一没有, 我们也需要处理
        // 为这个channel创建一个PendingRegistrationPromise
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 然后给注册的future添加一个listener, 在operationComplete()回调时
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                // 检查是否出错
                if (cause != null) {
                    // 在event loop上注册失败, 因此直接让ChannelPromise失败, 避免一旦我们试图访问这个channel的eventloop导致IllegalStateException
                    promise.setFailure(cause);
                } else {
                    // 注册已经成功, 因此设置正确的executor以便使用
                    // 注: 这里已经以前有过一个bug, 有issue记录
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                // 调用doBind0()方法来继续真正的bind操作
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

关键的doBind0()方法

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

    // 这个方法在channelRegistered()方法触发前被调用.
    // 让handler有机会在它的channelRegistered()实现中构建pipeline
    // 给channel的event loop增加一个一次性任务
    channel.eventLoop().execute(new OneTimeTask() {
        @Override
        public void run() {
            // 检查注册是否成功
            if (regFuture.isSuccess()) {
                // 如果成功则绑定localAddress到channel
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                // 如果不成功则设置错误到promise
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

3 类Bootstrap

类Bootstrap用于帮助客户端引导Channel.

bind()方法用于无连接传输如datagram (UDP)。对于常规TCP链接,用connect()方法。

3.1 类定义

package io.netty.bootstrap;

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {}

3.2 类成员

resolver属性

resolver默认设置为DefaultAddressResolverGroup.INSTANCE, 可以通过resolver()方法来赋值:

private static final AddressResolverGroup< ? > DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;

public Bootstrap resolver(AddressResolverGroup< ? > resolver) {
    if (resolver == null) {
        throw new NullPointerException("resolver");
    }
    this.resolver = (AddressResolverGroup<SocketAddress>) resolver;
    return this;
}

remoteAddress属性

remoteAddress可以通过remoteAddress()方法赋值, 有多个重载方法:

private volatile SocketAddress remoteAddress;

public Bootstrap remoteAddress(SocketAddress remoteAddress) {
    this.remoteAddress = remoteAddress;
    return this;
}
public Bootstrap remoteAddress(String inetHost, int inetPort) {
    remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort);
    return this;
}
public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) {
    remoteAddress = new InetSocketAddress(inetHost, inetPort);
    return this;
}

3.3 类方法

validate()方法

重写了validate()方法, 在调用AbstractBootstrap的validate()方法(检查group和channelFactory)外, 增加了对handler的检查:

@Override
public Bootstrap validate() {
    super.validate();
    if (handler() == null) {
        throw new IllegalStateException("handler not set");
    }
    return this;
}

connect()方法

有多个connect()方法重载, 功能都是一样, 拿到输入的remoteAddress然后调用doResolveAndConnect()方法:

private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 先初始化channel并注册到event loop
    final ChannelFuture regFuture = initAndRegister();
    if (regFuture.cause() != null) {
        // 如果注册失败则退出
        return regFuture;
    }

    final Channel channel = regFuture.channel();
    final EventLoop eventLoop = channel.eventLoop();
    final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

    if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
        // Resolver 不知道该怎么处理给定的远程地址, 或者已经解析
        return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
    }

    // 开始解析远程地址
    final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    final Throwable resolveFailureCause = resolveFuture.cause();

    if (resolveFailureCause != null) {
        // 如果地址解析失败, 则立即失败
        channel.close();
        return channel.newFailedFuture(resolveFailureCause);
    }

    if (resolveFuture.isDone()) {
        // 理解成功的解析了远程地址, 开始做连接
        return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
    }

    // 地址解析还没有完成, 只能等待完成后在做connectio, 增加一个promise来操作
    final ChannelPromise connectPromise = channel.newPromise();
    resolveFuture.addListener(new FutureListener<SocketAddress>() {
        @Override
        public void operationComplete(Future<SocketAddress> future) throws Exception {
            if (future.cause() != null) {
                channel.close();
                connectPromise.setFailure(future.cause());
            } else {
                doConnect(future.getNow(), localAddress, regFuture, connectPromise);
            }
        }
    });

    return connectPromise;
}

doConnect()方法中才是真正的开始处理连接操作, 但是还是需要检查注册操作是否完成:

private static ChannelFuture doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress,
        final ChannelFuture regFuture, final ChannelPromise connectPromise) {
    // 判断一下前面的注册操作是否已经完成
    // 因为注册操作是异步操作, 前面只是返回一个feature, 代码执行到这里时, 可能完成, 也可能还在进行中
    if (regFuture.isDone()) {
        // 如果注册已经完成, 可以执行连接了
        doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
    } else {
        // 如果注册还在进行中, 增加一个ChannelFutureListener, 等操作完成之后再在回调方法中执行连接操作
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
            }
        });
    }

    return connectPromise;
}

异步操作就是这点比较麻烦, 总是需要一个一个future的做判断/处理, 如果没有完成还的加promise/future来依靠回调函数继续工作处理流程.

终于到了最后的doConnect0()方法, 总算可以真的连接了:

private static void doConnect0(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
        final ChannelPromise connectPromise) {
    // 这个方法在channelRegistered()方法被触发前调用.
    // 给我们的handler一个在它的channelRegistered()实现中构建pipeline的机会
    final Channel channel = connectPromise.channel();
    // 取当前channel的eventlopp, 执行一个一次性任务
    channel.eventLoop().execute(new OneTimeTask() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 如果注册成功
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                connectPromise.setFailure(regFuture.cause());
            }
        }
    });
}

init(channel)方法

前面看基类AbstractBootstrap时看到过, 这个init()方法是一个模板方法, 需要子类做具体实现.

看看Bootstrap是怎么做channel初始化的:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    // 取channel的ChannelPipeline
    ChannelPipeline p = channel.pipeline();
    // 增加当前Bootstrap的handle到ChannelPipeline中
    p.addLast(handler());

    // 取当前Bootstrap设置的options, 逐个设置到channel中
    final Map<ChannelOption< ? >, Object> options = options();
    synchronized (options) {
        for (Entry<ChannelOption< ? >, Object> e: options.entrySet()) {
            try {
                if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + channel, t);
            }
        }
    }

    // 同样取当前Bootstrap的attrs, 逐个设置到channel中
    final Map<AttributeKey< ? >, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}

总结上在init()方法中, Bootstrap只做了一个事情: 将Bootstrap中保存的信息(handle/options/attrs)设置到新创建的channel.

clone()

深度克隆当前Bootstrap对象,有完全一样的配置,但是使用给定的EventLoopGroup。

这个方法适合用相同配置创建多个Channel。

public Bootstrap clone(EventLoopGroup group) {
        Bootstrap bs = new Bootstrap(this);
        bs.group = group;
        return bs;
    }

4 类ServerBootstrap

类ServerBootstrap用于帮助服务器端引导ServerChannel.

ServerBootstrap除了处理ServerChannel外, 还需要处理从ServerChannel下创建的Channel.Netty中称这两个关系为parent和child.

4.1 类定义

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {}

4.2 类属性

childGroup属性

childGroup属性用于指定处理客户端连接的EventLoopGroup, 设置的方式有两种:

group(parentGroup, childGroup)方法用于单独设置parentGroup, childGroup, 分别用于处理ServerChannel和Channel.
group(group)方法设置parentGroup, childGroup为使用同一个EventLoopGroup. 注意这个方法覆盖了基类的方法.

private volatile EventLoopGroup childGroup;

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}
public EventLoopGroup childGroup() {
    return childGroup;
}

4.3 childOptions/childAttrs/childHandler属性

这三个属性和parent的基本对应, 设值方法和检验都是一模一样的:

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();

private volatile ChannelHandler childHandler;

4.4 类方法

init()方法

ServerBootstrap的init(channel)方法相比Bootstrap的要复杂一些, 除了设置options/attrs/handler到channel外, 还需要为child设置childGroup, childHandler, childOptions, childAttrs:

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption< ? >, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey< ? >, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption< ? >, Object>[] currentChildOptions;
    final Entry<AttributeKey< ? >, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

ServerBootstrapAcceptor的实现, 主要看channelRead()方法:

private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 获取child channel
        final Channel child = (Channel) msg;
        // 设置childHandler到child channel
        child.pipeline().addLast(childHandler);
        // 设置childOptions到child channel
        for (Entry<ChannelOption< ? >, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        // 设置childAttrs到child channel
        for (Entry<AttributeKey< ? >, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        // 将child channel注册到childGroup
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

相关文章

网友评论

      本文标题:netty Bootstrap

      本文链接:https://www.haomeiwen.com/subject/xnogmhtx.html