1 概述
在介绍了Netty服务端启动之后(参考笔者文章Netty源码-服务端启动过程),再看Netty的客户端启动会发现二者十分类似,服务端启动通过调用了ServerBootstrap.bind
方法开启,而客户端启动则通过调用Bootstrap.connect
方法启动。
本文的介绍比较简单,因为许多操作和服务端启动一致,就没有详细介绍,读者可集合服务端启动过程一起理解。
2 客户端的典型编码
和介绍服务端一样,我们先看一下客户端的典型编码:
public class TimeClient {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//客户端只要准备一个group即可
b.group(group)
//注册客户端使用的channel
.channel(NioSocketChannel.class)
//设置客户端channel选项和属性
.option(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.valueOf("attrKey"), "attrValue")
//注册客户端pipelne中的handler
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//调用connect启动客户端
ChannelFuture f = b.connect(host, port).sync();
} finally {
//优雅停机
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeClient().connect(8080, "127.0.0.1");
}
}
3 一些配置函数
在第1节我们提到了Bootstrap
和ServerBootstrap
类,Bootstrap
主要负责客户端的启动,而ServerBootstrap
则主要负责服务端的启动,我们在Netty源码-服务端启动过程第3节一些配置函数介绍了ServerBootstrap
的一些常用配置函数,因为ServerBootstrap
即需要配置Accept线程和Server channel,又需要配置客户端连接的线程和客户端channel,所以ServerBootstrap
的配置方法都是option
和childOption
、handler
和childHandler
这样成对出现的,因为Bootstrap
主要负责客户端启动,所以只需要配置客户端线程和channel即可,所以其配置方法则没有child*
这一类。相关配置方法在文章Netty源码-服务端启动过程也都介绍过,本文也就不再介绍了。
4 客户端启动
客户端的启动由Bootstrap.connect
方法开启,下面看其源码:
//Bootstrap
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
//验证一些必要配置是否都已经配置过
validate();
//进行地址解析和实际的连接
return doResolveAndConnect(remoteAddress, config.localAddress());
}
@Override
public Bootstrap validate() {
//父类中的验证主要是保证group和channelFactory不为空
super.validate();
//验证设置了handler
if (config.handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
//客户端的启动也分为三个步骤,第一为初始化通道,第二为向
//EventLoopGroup中的某个NioEventLoop持有的Selector注册
//通道,第三个为解析地址和连接
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//这里完成了第一和第二步骤,即初始化和注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//这里完成第三个步骤:解析地址和连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
4.1 Channel初始化和注册
其实客户端Channel初始化和注册实现与服务端基本一样,在初始化时会调用AbstractBootstrap.init
方法,这个方法根据在具体的子类中进行了重写,客户端的子类为Bootstrap
,其实现如下:
//Bootstrap
//逻辑比较简单,首先向客户端channel的pipeline添加handler
//然后进行选项和attr的设置
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
除此之外,Channel初始化和注册与服务端一样,可参见笔者文章Netty源码-服务端启动过程4.1节相关内容,这里不再介绍。
doResolveAndConnect0
方法主要完成第三个步骤,解析服务器地址并连接:
//Bootstrap
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
//首先进行域名解析(如果指定的服务端地址时域名而不是
//IP时需要进行解析)
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
//解析成功之后进行连接操作
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
4.2 地址解析和连接
在完成第一和第二个步骤之后,通道初始化和注册都已经完成,Bootstrap
就会调用doResolveAndConnect0
方法解析服务器地址并连接。
4.2.1 地址解析
因为我们在指定服务端地址不仅可以使用IP,还可以使用域名,所以我们在指定域名时,就需要Netty将其解析为IP地址,实现逻辑也比较简单,默认的解析器为DefaultNameResolver
,根据域名解析出IP调用的方法为java.net.InetAddress.getAllByName(hostname)
,这里也不再展开介绍。
4.2.2 连接
在将域名(如果配置的服务端地址为域名而不是IP时会进行解析操作)解析为IP之后,会调用doConnect
进行连接操作:
//Bootstrap
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
//直接调用通道的connect方法
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
从上面的源码可以看出,连接操作直接通过调用Channel.connect
方法完成,Channel.connect
方法我们直接看起子类AbstractChannel
中的实现:
//AbstractChannel
//connect都是直接通过调用Pipeline的connect进行连接操作
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
根据Netty源码-ChannelPipeline和ChannelHandler中的介绍,connect
方法属于Outbound事件,所以最终会调用HeadContext.connect
方法:
//HeadContext
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
HeadContext.connect
方法调用了Unsafe.connect
方法,我们看其在子类AbstractNioUnsafe
中的实现:
//AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
//这里调用了外部类AbstractNioChannel.doConnect
//方法执行实际的连接动作
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
因为这里介绍的是客户端的启动,所以我们看AbstractNioChannel.doConnect
在其子类NioSocketChannel
中的实现:
//NioSocketChannel
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
//如何本地地址不为空,表示要进行本地地址绑定
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
//最终调用了java channel.connect方法
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
//本地地址绑定,调用java channel的bind方法进行绑定
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
网友评论