系列
Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题
开篇
-
这篇文章的主要目的是分析NioEventLoop的事件处理线程的启动流程 以及 NioEventLoop如何处理消息事件的流程。
-
NioEventLoop线程启动的整体思路在于NioSocketChannel第一次注册到NioEventLoop的时候,会执行eventLoop.execute()方法进入NioEventLoop的执行流程当中,按照NioEventLoop#execute => NioEventLoop#startThread => NioEventLoop#doStartThread顺序最终往NioEventLoop的ExecutorService提交一个事件处理任务,该任务负责处理事件消息和注册任务。
-
NioEventLoop的事件处理流程的整体思路在事件处理线程获取事件selectedKeys,遍历selectedKeys挨个进行处理,每个selectedKey绑定了对应的channel对象,进入channel对象的处理流程当中。
NioEventLoop 线程启动
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略部分代码
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 执行SingleThreadEventExecutor#execute会启动工作线程
// 这里的Runnable对象就是NioEventLoop的工作线程,负责处理事件
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
}
}
- AbstractChannel#register负责将channel注册到eventLoop当中,即NioEventLoop当中。
- eventLoop#execute负责执行NioEventLoop的execute方法,即父类SingleThreadEventExecutor#execute。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
private final Queue<Runnable> taskQueue;
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
// 添加task任务
addTask(task);
} else {
// 启动NioEventLoop对应的工作线程
startThread();
// 添加task任务
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
}
// 负责把task添加到taskQueue当中
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
// 启动NioEventLoop对应的线程
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
// 通过ExecutorService来启动NioEventLoop的任务处理线程对象
//
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行run方法来处理NioEventLoop事件
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略代码
}
}
});
}
}
- SingleThreadEventExecutor#execute方法内部会执行addTask和startThread两个方法。
- addTask方法负责往NioEventLoop添加任务,如Channel注册到NioEventLoop的任务。
- startThread方法通过调用doStartThread来启动NioEventLoop任务处理线程,即通过NioEventLoop的executor.execute()来启动一个任务实现的。
- NioEventLoop的事件处理任务的Runnable对象的run()方法内部会执行SingleThreadEventExecutor.this.run()进行事件处理的循环。
- SingleThreadEventExecutor.this指代的是NioEventLoop对象,执行NioEventLoop#run方法。
EventLoop 事件处理
NioEventLoop#run
public final class NioEventLoop extends SingleThreadEventLoop {
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 先处理事件
processSelectedKeys();
} finally {
// 再处理任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 先处理事件
processSelectedKeys();
} finally {
// 再处理任务
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
- NioEventLoop#run的processSelectedKeys方法负责处理NioEventLoop的事件。
- NioEventLoop#run的runAllTasks方法负责处理NioEventLoop的任务。
NioEventLoop#processSelectedKeys
public final class NioEventLoop extends SingleThreadEventLoop {
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
}
- NioEventLoop#processSelectedKeys方法负责处理NioEventLoop事件,分为processSelectedKeysOptimized和processSelectedKeysPlain。
- NioEventLoop#processSelectedKeysOptimized处理优化的selector的消息事件。
- NioEventLoop#processSelectedKeysPlain处理没经过优化的selector的消息事件。
- NioEventLoop#processSelectedKey的进行真正的事件处理流程,Nio事件参数SelectionKey对象包含了AbstractNioChannel对象。
- 一个Selector中可能被绑定上了成千上万个Channel, 通过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理。
NioEventLoop#processSelectedKey
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略代码
try {
int readyOps = k.readyOps();
// 处理OP_CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理OP_WRITE事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理OP_READ和OP_ACCEPT事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
- NioEventLoop#processSelectedKey负责处理不同类型的事件。
- OP_CONNECT连接事件,OP_WRITE写事件,OP_READ读事件,OP_ACCEPT接受事件。
- AbstractNioChannel#unsafe返回的AbstractNioChannel.NioUnsafe对象,由unsafe继续处理事件。
SingleThreadEventExecutor#runAllTasks
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
// 从NioEventLoop的taskQueue获取代处理的任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 安全执行任务
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
}
- runAllTasks负责从NioEventLoop的taskQueue获取代处理的任务并进行执行。
- runAllTasks还是由NioEventLoop的事件处理线程来进行处理的。
NioEventLoop#openSelector
public final class NioEventLoop extends SingleThreadEventLoop {
private SelectedSelectionKeySet selectedKeys;
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, t);
}
return selector;
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
} catch (RuntimeException e) {
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
} else {
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", selector);
}
return selector;
}
}
- 创建出SelectedSelectionKeySet的实例 selectedKeySet。
- 使用反射,将 unwrappedSelector 中的 selectedKeysField字段,替换成 selectedKeySet。
- 最后一步, 也很重要 selectedKeys = selectedKeySet。
- 现在想获取装有感兴趣Key的 HashSet集合,已经不可能了,取而代之的是更优秀的selectedKeySet,也就是下面我们使用的selectedKeys ,于是我们想处理感性趣的事件,直接从selectedKeys中取, Selector轮询到感兴趣的事件,也会直接往selectedKeys中放。
网友评论