netty server端
以netty官方EchoServer服务器端的启动代码分析:
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);//用于处理客户端的连接请求
EventLoopGroup workerGroup = new NioEventLoopGroup();//用于处理与各个客户端连接的 IO 操作
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
//负责处理客户端的连接请求
.handler(new LoggingHandler(LogLevel.INFO))
//负责和客户端的连接的 IO 交互
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
和客户端的代码相比, 没有很大的差别, 基本上也是进行了如下几个部分的初始化:
EventLoopGroup: 不论是服务器端还是客户端, 都必须指定 EventLoopGroup. 在这个例子中, 指定了 NioEventLoopGroup, 表示一个 NIO 的EventLoopGroup, 不过服务器端需要指定两个 EventLoopGroup, 一个是 bossGroup, 用于处理客户端的连接请求; 另一个是 workerGroup, 用于处理与各个客户端连接的 IO 操作.
ChannelType: 指定 Channel 的类型. 因为是服务器端, 因此使用了 NioServerSocketChannel.
Handler: 设置数据的处理器.
- Channel 的初始化
group.channel(NioServerSocketChannel.class)
根据源码以及在分析客户端源码很容易看出来服务端channel的初始化
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
这是会将ServerBootstrap属性ChannelFactory初始化ReflectiveChannelFactory且clazz为NioServerSocketChannel
至于类型也即NioServerSocketChannel
NioServerSocketChannel的类图如下:
图片.png
unsafe 字段其实是一个 AbstractNioMessageChannel#AbstractNioUnsafe 的实例.
我们来总结一下, 在 NioServerSocketChannsl 实例化过程中, 所需要做的工作:
调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel
AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:
parent 属性置为 null
unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类
pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.
AbstractNioChannel 中的属性:
SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.
readInterestOp 被设置为 SelectionKey.OP_ACCEPT
SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
NioServerSocketChannel 中的属性:
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
-
ChannelPipeline 初始化和客户端一致,在创建channel时会创建pipeline
-
bossGroup 与 workerGroup
这里的bossGroup和workerGroup。就是前面介绍nio reactor模式的多线程版本 可参考
图片.png
bossGroup 不断地监听是否有客户端的连接, 当发现有一个新的客户端连接到来时, bossGroup 就会为此连接初始化各项资源, 然后从 workerGroup 中选出一个 EventLoop 绑定到此客户端连接中. 那么接下来的服务器与客户端的交互过程就全部在此分配的 EventLoop 中
源码:
1、 初始化EventLoopGroup
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//和客户端的相同
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
2、 group是如何和channel关联
- 通过客户端分析可知在connect,而服务端为bind方法绑定
AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
源码:
// 实例化channel和 channel 的注册过程
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//1、config()方法为子类Bootstrap实现返回BootstrapConfig对象。
// 2、BootstrapConfig中group()其实调用的为其父类 AbstractBootstrapConfig中的group()方法,该方法中返回bootstrap.group()即bootstrap中我们添加的group
//3、也就是最终调用NioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法。该方法返回 next().register(channel);
// 4、 next()方法MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup实现的。该方法返回 chooser.next(); 这里的
//这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回;可参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);)
//我们还记得最初NioEventLoopGroup的构造器最终会调用MultithreadEventExecutorGroup的构造器MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
// 在构造器中有一个children[i] = newChild(executor, args);方法格外引起我们的注意。因为这里的前面的next方法返回children数组中的值,newChild方法的实现类在NioEventLoopGroup中
//它返回return new NioEventLoop(this, executor, (SelectorProvider) args[0],
// ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);最后NioEventLoop的父类SingleThreadEventLoop 父类中的注册方法register,该方法调用到第四步
// 5、promise.channel().unsafe().register(this, promise);获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register.
//6、在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel。AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
//7、AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
}
这里 group() 方法返回的是上面 bossGroup, 至于channel 是一个 NioServerSocketChannsl 实例, 因此我们可以知道, group().register(channel) 将 bossGroup 和 NioServerSocketChannsl 关联.
剩下的workerGroup 是在哪里与 NioSocketChannel 关联的呢?
我们继续看 init(channel) 方法,在子类ServerBootstrap中实现:
//这里初始化的为nioserverchannel
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//currentChildGroup、currentChildHandler客户端的连接的 IO 交互
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
上面代码中init 方法在 ServerBootstrap 中重写了, 从上面的代码片段中我们看到, 它为 pipeline 中添加了一个 ChannelInitializer, 而这个 ChannelInitializer 中添加了一个关键的 ServerBootstrapAcceptor handler.
关于 handler 的添加与初始化的过程, 我们留待下一小节中分析, 我们现在关注一下 ServerBootstrapAcceptor 类.ServerBootstrapAcceptor 中重写了 channelRead 方法, 其主要代码
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...
childGroup.register(child).addListener(...);
}
ServerBootstrapAcceptor 中的 childGroup 是构造此对象是传入的 currentChildGroup, 即我们的 workerGroup,
而 Channel 是一个 NioSocketChannel 的实例, 因此这里的 childGroup.register 就是将 workerGroup 中的摸个 EventLoop 和 NioSocketChannel 关联了.
, ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时,Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
... 省略异常处理
buf.add(new NioSocketChannel(this, ch));
return 1;
}
在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦
简单用一个图展示下:
图片.pnghandler 的添加
服务器端的 handler 的添加过程和客户端的有点区别
EventLoopGroup 一样, 服务器端的 handler 也有两个, 一个是通过 handler() 方法设置 handler 字段, 另一个是通过 childHandler() 设置 childHandler 字段
分析上面ServerBootstrap 重写了 init 方法
pipeline中由
head<->ChannelInitializer<->tail
在客户端分析ChannelInitializer,当channel绑定到eventLoop后(在这里是 NioServerSocketChannel 绑定到 bossGroup)中时, 会在pipeline中发出fireChannelRegistered 事件, 接着就会触发 ChannelInitializer.initChannel 方法的调用.
然后变成
head<->LoggingHandler<->ServerBootstrapAcceptor<->tail
上面是server的handler,那么childHandler 是在哪里添加的呢?还记得ServerBootstrapAcceptor这个链接的handler,很明显这里是和客户端交互的地方。
先说结论,根据源码在ServerBootstrapAcceptor中channelRead方法将我们childhandler添加。
那么就带来两个问题。1 child channle是如何从channelRead获取。2、channelRead是何时调用
不过这两个问题通过源码可以一并解决:
首先是知道从那里来。ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter,作为inbound的handler,并且重写channelRead
那么当客户端发送数据到客户端时,对于服务端就是接收读取的io事件,那么就会执行channelRead这个方法(这里就很清楚io一般时阻塞,所以在channelRead
方法中childHandler一般是我们自己实现的io操作handler,将这个handler绑定childChannel上处理,且处理他的线程为childGroup)这么做的用途很明显,
//inbound事件到来时,这里就是客户端IO
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//将操作io的handler绑定到childGroup,执行完成后断开childchannel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 总结: 从上面分析服务端有两个pipeline。ServerBootstrapAcceptor只负责接受客户端的connect也就是服务端的accept事件(所以上面方法的入参msg为客户端的channel)
链接后将后续的io相关的childHandler 绑定到这个childChannel上且注册childGroup
网友评论