Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.利用BootStrap我们可以实现创建channel,把channel注册在EventLoop上,发起连接等功能.
BootStrap的类结构如下:
image.png
1. Client端启动实例
下面是个简单的客户端实例,我们用这个来分析BootStrap的整个流程.
public class Client {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
2. group()
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group; //设置成员变量group 为传入的group
return self();
}
- 这里设置
EventLoopGroup
,是为了以后注册和handle事件做准备,EventLoopGroup
可以理解成一个线程池.在后面注册和handler事件的时候,会从EventLoopGroup
取线程处理.
3. channel()
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
- 这里并不是返回channel,而是返回一个channelFactory,利用工厂方法构造channel.而下面这个则是一个channelFactory,他是根据传入的Class,通过反射构造channel.
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
//通过返回获取channel实例
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
4. option()
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) { //value为null,则表示删除这个option
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return self();
}
- 为Channel设置一些可选的性质.当value为null的时候表示删除这个option.
5. handler()
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return self();
}
- 设置handler,这里handler是用户自定义处理连接逻辑.例如编码器或者自定义的handler.通常来说我们通过
ChannelInitializer
的init
来添加handler.
6. connect()
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
//检验各个part是否准备好
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
- 先验证各个part是否准备好,然后再发起连接.
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //1
final Channel channel = regFuture.channel(); //获取channel
if (regFuture.isDone()) { //异步的结果返回
if (!regFuture.isSuccess()) { //不成功
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
//异步结果还没出来,添加监听器来监听
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause(); //异步结果
if (cause != null) {
//注册失败
promise.setFailure(cause);
} else {
// 注册成功了.
promise.registered();
// 发起连接
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
- 上面是整个注册,连接的逻辑.下面这部分单独把注册部分拿出来.
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); //创建实例
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
//如果到这里还没注册channel,则强制使用GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//如果到这里还没注册channel,则强制使用GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//在这里异步注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close(); //已经注册成功了
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
- 上面是整个注册的逻辑,采用是异步的策略,也就是说我们可以在程序中,根据监听器的结果来判断注册是否成功.
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
- 在这里初始化channel.并向
channelPipeline
中添加handler.为channel设置option和Attribute
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
//获取到该channel绑定的EventLoop
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
//已经解析了,或者没有办法解析.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) { //返回异步解析的结果
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// 不能立即解析
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// 成功解析,则连接
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// 没有立刻解析,则添加监听器等待解析的结果
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
//解析失败
channel.close();
promise.setFailure(future.cause());
} else {
// 解析成功,发起连接.
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
- 以上是异步解析地址.
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
//本地地址
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
- 上面这部分是真正的异步连接服务器.
7. 总结
通过上面的叙述,我们不难看出来,BootStrap所做的3件事.无非在这过程中,多次利用异步来获取结果.
- 创建channel,并初始化
- 注册channel
- 连接到服务器
网友评论