服务启动
服务启动可分为以下几步:
①:Boostrap.bind()
②:创建NioServerSocketChannel
③:将NioServerSocketChannel注册到EventLoopGroup(这里的EventLoopGroup指的是我们前面服务创建时指定的bossGroup)
下面结合源码进行分析:
ServerBoostrap-bind源码分析
//bind(port)最终调用doBind方法
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
//在doBind方法,主要看其第一行代码,intAndRegister方法:初始化NioServerSocketChannel并注册
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
.....
}
/**
* 服务端NioServerSocketChannel的创建、初始化、注册
* @return
*/
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//这里的newChannel其实就是对我们之前channel方法传入的class进行实例化,即NioServerSocketChannel的创建
channel = channelFactory().newChannel();
//NioServerSocketChannle的初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//服务端Channel-NioServerSocketChannel的注册到EventLoop上
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
NioServerSocketChannel的创建
/**
* NioServerSocketChannel的创建,服务端socketChannel的创建包含的内容:
* 实例化的时候注册OP_Accept事件
* 默认pipeline由DefaultChannelPipeline提供,其默认只包含HeadContext和TailContext两个不含ChannelHandler的AbstratChannelHandlerContext组成。
*/
/**
* NioServerSocketChannel默认的构造函数
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* 最终调用的构造器函数,可发现其①首先注册了OP_Accept事件,具体的操作发生在父类AbstractNioChannel的构造函数中
* 而创建默认的Pipeline的动作则发生在父类AbstractChannel的构造函数中
* ②然后初始化了一些服务端的基本配置
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//AbstractNioChannel父类中:Channel感兴趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//感兴趣的事件,后面,将Channel注册到EventLoop上的时候会用到
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//AbstractChannel父类中:创建默认的Pipeline
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
NioServerSocketChannel的初始化
NioServerSocketChannel的初始化,除了初始化一些参数外,还为pipeline添加一个ChannelInitializer的handler,这个特殊的handler会在channel register的时候被调用,然后回调initChannel方法,回调完成之后,会将当前channel从所属的pipeline中移除。
NioServerSocketChannel的初始化
//ServerBoostrap.init方法
//这里需重点关注最后为NioServerSocketChannel添加的Channelhandler,当OP_Accept事件发生的时候,NioSocketChannel的初始化会调用这里
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//重点
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
//将我们之前添加的ChannelHanndler添加到SocketChannel的初始化中
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
上述初始化代码中为pipeline添加了一个重要的类 ServerBootstrapAcceptor 这个类是专门负责处理IO监听事件的。它的里面提供了一个重要的方法channelRead,这个方法做了两件事:
- 为接收到的Channel添加我们在服务启动前设置的自定义的handler。
- 将接收到的channel注册到childGroup所在的EventLoopGroup中
NioServerSocketChannel的ChannelHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
//省略部分代码...
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//省略部分代码...
}
NioServerSocketChannel的注册
/**
* NioServerSocketChannel的注册,这里的group是我们创建ServerBoostrap传入的第一个group即bossGroup
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
//next方法用于顺序返回group中的一个EventLoop,因为我们创建bossGroup的时候只指定了一个EventLoop,所以其实这里只有一个Eventloop可供选择
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
//最终的注册会调用channel自身的register方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
....
channel.unsafe().register(this, promise);
return promise;
}
//看一下channel自身的register方法,发现其会判断调用当前register的线程是否就是支撑当前EventLoop的线程
//如果是,则直接进行register,否则封装成一个任务,等待EventLoop之后进行调度执行
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
//在register0方法中,channel将自身注册到了EventLoop上Selector上,只不过注册的ops是0,表示只注册,不监听任何事件
//而真正的注册OP_Accept事件发生在,java原生的ServerSocketChannel绑定了localAddress之后,
public final void bind(final SocketAddress localAddress, final ChannelPromise){
boolean wasActive = isActive();
try {
//java原生接口ServerSocketChannle的bind方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//这里修改了NioServerSocketChannle注册的事件,将0->改为了感兴趣的事件即OP_Accpet事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
}
网友评论