上文分析了NioEventLoopGroup和NioEventLoop的关系,本文分析一下Netty在启动server时的初始化流程,仍然从netty server启动的demo例子入手。
public void start() throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println(NettyServer.class.getName() +
" started and listen on " + channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully().sync();
}
}
整个启动过程,首先初始化NioEventLoopGroup线程池,然后初始化引导类,初始化内容稍后分析时讲到时再说,这里主要看serverBootstrap.bind()这个方法,通过进入源码,bind操作主要的代码实现流程在AbstractBootstrap类中实现。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
....
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
这个doBind方法中最重要的是initAndRegister方法和doBind0方法,下面我们详细分析下这两个方法的主要实现过程。
- initAndRegister方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
....
ChannelFuture regFuture = config().group().register(channel);
....
return regFuture;
}
这个方法首先通过channelFactory创建一个Channel,然后对这个Channel进行初始化操作,最后为这个Channel分配一个NioEventLoop完成这个Channel上的读写事件。
1.1 Channel的创建
我们首先看下channelFactory是在哪初始化的,通过追溯代码发现是在Demo程序中调用 .channel(NioServerSocketChannel.class)这句代码时完成初始化的。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
channelFactory的具体实现类是ReflectiveChannelFactory,继续看下这个实现类。
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
从这个实现类中的newChannel方法可以看到是通过反射创建了一个Channel对象,因为我们demo程序中传入的时NioServerSocketChannel类,所以在initAndRegister方法中创建的Channel就是一个NioServerSocketChannel对象。
1.2 init方法
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这一堆代码中,首先对刚才第一步新建的NioServerSocketChannel完成参数赋值。
然后为这个channel对应的ChannelPipeline中添加了一个ServerBootstrapAcceptor处理器,这个处理器的作用主要是用来处理新建连接的,后面一节会详细讲。
1.3 register方法
这个方法主要是为新建的channel分配一个NioEventLoop,处理这个channel通道上的事件,看下下面这句代码。
final ChannelFuture initAndRegister() {
ChannelFuture regFuture = config().group().register(channel);
}
config().group()返回的是一个bossGroup,bossGroup就是我们在demo程序中第一句创建的NioLoopGroup,这个group主要用来为新来的连接分配NioEventLoop。
register方法的具体实现在MultithreadEventLoopGroup类中,如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
分配NioEventLoop的具体实现在next方法,这个分配策略上节内容已经讲过,这里不再描述。
register方法的具体实现在AbstractChannel这个类中:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
(1)通过AbstractChannel.this.eventLoop = eventLoop这句将分配到的NioEventLoop绑定到NioServersocketChannel上。
(2)判断NioEventLoop的线程是否已经启动,如果已经启动,调用register0方法;否则调用 eventLoop.execute方法启动线程,看一下这个方法的具体实现。
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
首先向LinkedBlockingQueue阻塞队列中添加需要执行的任务,然后这里会启动一个线程,这个线程的主要功能是由SingleThreadEventExecutor.this.run()这行代码中完成的,这个方法的具体实现实在NioEventLoop中实现的。
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
这个方法里有一个for循环,最重要的两个方法是processSelectedKeys和runAllTasks,processSelectedKeys主要是监听io读写事件,runAllTasks主要是执行队列中存放的任务的,这两个方法的具体实现先不去研究,只要知道register方法会启动NioEventLoop的线程,此后这个线程会一直监听io事件和执行队列中的任务就可以了。
(3)从上面可以发现真正的注册是在register0这个方法中实现的,下面我们就看下这个方法的具体功能。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
}
这个方法的主要功能就是将1.1 中新建的Channel注册到NioEventLoop的Selector上,这样NioEventLoop的Selector就可以监听到此channel的io事件了。
initAndRegister方法的主要过程就分析完了,其实主要有三点:
(1)创建了一个NioServerSocketChannel对象
(2)为NioServerSocketChannel对应的ChannelPipeLine增加了一个ServerBootstrapAcceptor处理器,用来处理新的连接。
(3) 从NioEventLoopGroup中分配了一个NioEventLoop,用于监听NioServerSocketChannel通道上的io事件。
这里要注意的是NioEventLoopGroup对应的是demo程序中的bossGroup,这个NioEventLoopGroup的NioEventLoop主要就是处理NioServerSocketChannel连接事件,下面一节就讲下客户端连接服务端时,服务端是如何处理这个连接事件的。
网友评论