(*文章基于Netty4.1.22版本)
在上一篇文章Netty源码分析----服务启动之Channel初始化中,一开始给出了一个NIO的demo,然后从构造方法开始分析Netty对应的封装操作流程,而这篇文章,而这篇文章会开始分析,当初始化完成之后,Netty是如何开始接收请求的。
先看下上一篇文章NIO的demo中,是如何接收请求的(只保留run方法,其他忽略)
public class NioServer implements Runnable {
//....
public void run() {
while (true) {
try {
// 无论是否有事件发送,selector每隔1s被唤醒一次
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey selectionKey = null;
while (iterator.hasNext()) {
//....SelectionKey处理
}
} catch (IOException e) {
}
}
}
}
源码分析
Netty如何开始接收请求
那么这次需要看下Netty这部分代码在哪里,且是如何触发的。回顾一下,在Netty源码分析----服务启动之Channel初始化中分析到,Channel会注册到Selector中,而这个Selector是在EventLoop中初始化的,那么也就是说,Selector对Channel的选择应该是在EventLoop中的,由于我们使用的是NioEventLoopGroup,所以创建的EventLoop是NioEventLoop,那么到NioEventLoop中看下,核心代码是run方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
在其中我们也能看到NIO使用Selector去获取SelectionKey的影子,但这篇文章不会去详细分析run的逻辑,这个会在以后再进行分析,这里只分析启动的时候触发的流程。
从bind方法开始,有些方法会调用channel.eventLoop().execute这个方法,该方法实际调用的是SingleThreadEventExecutor的execute方法,看下这个的逻辑
public void execute(Runnable task) {
//....
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
一开始inEventLoop为true,那么执行startThread和startTask方法,看下实现:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
//....
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} finally {
//....
}
}
});
}
使用线程池执行了一个异步的任务,首先会先设置thread = Thread.currentThread()(inEventLoop方法就是比较thread属性是否等于当前线程,由于一开始空,所以为false,还有其他的情况就是当前线程和EventLoop的线程是否同一个),然后调用了run方法,那么这时候就是执行的NioEventLoop的run方法了。
再看下addTask的实现
protected void addTask(Runnable task) {
//....
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
addTask就是将任务放到队列中,顺便的说一下在NioEventLoop的方法里,会分开两部分时间,有一部分时间执行io任务,就是处理连接,read,write等事件,一部分就是处理自定义的任务,就是通过EventLoop的execute方法加入的任务,也就是队列里的任务。
整个流程图如下:
image.png
注:startTread方法不一定是在注册的时候调用的,因为启动该线程的条件是添加任务的时候还未启动,刚好这里注册的时候第一次加入任务,所以在这里启动,如果其他版本在这之前还加入了任务,那么到注册的时候就是直接加入任务,而不需要启动线程
其他初始化
从initAndRegister开始看起,因为上篇文章有些细节没有看完
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// ....
}
// ....
return regFuture;
}
这个init上篇文章没有分析到,这个实现在ServerBootstrap中
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
代码虽然有点长,但是逻辑不复杂,主要分几点:
- 在Channel中设置属性
- 添加一个Handler到ChannelPipeline中
ChannelPipeline如何添加Handler以及ChannelPipeline的原理,其他文章会进行详细分析,暂时把其当成一个链表就OK,这里就是把一个Handler加入到链表中,此时链表只有这个Handler(事实有一个Head和Tail,忽略)。
这个Handler的实现中又会将其他Handler加入到ChannelPipeline中,那么这个ChannelInitializer类型的Handler是几时调用的呢?再往后看
init方法后面就是register的代码了,这里上篇文章说到他会调用到AbstractUnsafe的register方法,这里也有些细节没有看到,那么进入register方法看下(register又调用了核心逻辑的registe0r方法,直接看register0方法)
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
invokeHandlerAddedIfNeeded这个方法还没分析过,先看下这个方法是不是我们要找的
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
如果是第一次注册,那么会调用callHandlerAddedForAllHandlers方法,那么就是说,其实这个方法只会调用一次。再看下callHandlerAddedForAllHandlers上面的注释,大概的意思是
在将Channel注册到Selector上的时候,应该调用在注册前添加的Handler
额,刚好注册前添加的Handler就是上面说的ChannelInitializer,那么进去看看是如何调用的
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
逻辑很简单,这个方法只会在注册完成前调用,调用后就把registered 设置为true了,接着就是调用PendingHandlerCallback的execute方法,这个PendingHandlerCallback是什么呢,先看下Pipeline的addLast中的几句代码
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
// ....
synchronized (this) {
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//....
}
return this;
}
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
在添加的时候,只有在Channel未注册到Selector的时候,才会对PendingHandlerCallback赋值,其也是个链表。
回到callHandlerAddedForAllHandlers方法,execute方法最终会调用到ChannelInitializer的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
} finally {
remove(ctx);
}
return true;
}
return false;
}
handlerAdded中会调用到initChannel方法,然后会把该Handler从ChannelPipeline中移除。
其实ChannelInitializer实现了Channel注册后的为每一个Channel添加ChannelHandler的功能,但是其本质也是也是一个ChannelHandler
网友评论