这一章我们将讲解 netty
中真正使用的事件轮询器 NioEventLoop
,通过这一章,你将了解:
netty
是如何通过一个事件轮询器管理多个嵌套字Socket
的通道channel
。- 又是如何即处理通道的
channel
的IO
事件,以及添加事件轮询器上任务的。
一. 选择器 Selector
netty
的事件轮询器也是通过的 java nio
的选择器 Selector
来管理多个嵌套字 Socket
的通道 channel
。
那么选择器 Selector
是如何与通道 channel
,并管理它们的呢。
1.1 注册选择器
在 SelectableChannel
类用 register
方法将通道注册到选择器 Selector
中。
/**
* 将此通道注册到给定的选择器,并返回一个选择键SelectionKey。
*/
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
sel
: 选择器,多个通道channel
可以注册同一个选择器,因此一个选择器就可以管理多个通道了。ops
: 该通道channel
关注的IO
事件类型,分为读事件OP_READ
,写事件OP_WRITE
,连接事件OP_CONNECT
和 接受事件OP_ACCEPT
。可以关注一个或者多个事件。att
: 可以绑定在返回值 选择键SelectionKey
上的值,可以从选择键中获取它。- 返回的选择键
SelectionKey
,它其实代表的是IO
事件类型。
如果当前通道已经注册到给定的选择器 sel
上了,调用这个方法,更改了这个通道关注的 IO
事件类型 ops
,和绑定的对象 att
。
在 AbstractSelectableChannel
实现中我们看到:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
// 已经注册到给定的 选择器 sel
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
1.2 选择键 SelectionKey
public abstract class SelectionKey {
/**
* 构造该类的实例
*/
protected SelectionKey() { }
// -- Channel and selector operations --
/**
* 返回为其创建键的通道 Channel。
* 即使在该选择键被取消之后,此方法仍将继续返回通道 Channel。
*
* @return This key's channel
*/
public abstract SelectableChannel channel();
/**
* 返回为其创建键的选择器 Selector。
* 这个方法将继续返回选择器,即使在选择键被取消之后。
*
* @return This key's selector
*/
public abstract Selector selector();
/**
* 返回这个键是否有效。
* 键在创建时是有效的,
* 直到它被取消、它的通道关闭或它的选择器关闭后无效。
*/
public abstract boolean isValid();
/**
* 请求取消该选择键的通道与其选择器的注册。
* 方法返回后该选择键将是无效的,并将它添加到其选择器的 cancelled-key的集合。
* 在下一次选择操作期间,该键将从所有选择器的键集中删除。
* 如果此选择键已被取消,则调用此方法没有任何效果。一旦取消,选择键将永远无效。
*
* 这个方法可以在任何时候被调用。
* 选择器的 cancelled-key 集合是进行过同步处理的,
* 因此如果同时调用涉及相同选择器的取消或选择操作,可能会短暂阻塞。
*/
public abstract void cancel();
// -- Operation-set accessors --
/**
* 返回此选择键关注的 IO 事件类型集合。
* 可以保证返回的集合只包含对这个键的通道有效的操作位。
* 这个方法可以在任何时候被调用。
* 它是否阻塞以及阻塞多长时间取决于具体实现。
*
* 如果这个键被取消,则抛出 CancelledKeyException 异常。
*/
public abstract int interestOps();
/**
* 重新设置此选择键关注的 IO 事件类型集合 ops。
* 这个方法可以在任何时候被调用。
* 它是否阻塞以及阻塞多长时间取决于实现。
*
* 如果设置的 ops,不是键的通道所支持的操作,抛出 IllegalArgumentException 异常
* 如果这个键被取消,则抛出 CancelledKeyException 异常。
*/
public abstract SelectionKey interestOps(int ops);
/**
* 返回此键的就绪操作 ready-operation 集合。
* 可以保证返回的集合只包含对这个键的通道有效的操作位。
*
* 如果这个键被取消,则抛出 CancelledKeyException 异常。
*/
public abstract int readyOps();
// -- Operation bits and bit-testing convenience methods --
/**
* 用于读操作的操作位。
*/
public static final int OP_READ = 1 << 0;
/**
* 写操作的操作位。
*/
public static final int OP_WRITE = 1 << 2;
/**
* socket-connect 操作的操作位。
*/
public static final int OP_CONNECT = 1 << 3;
/**
* socket-accept 操作的操作位。
*/
public static final int OP_ACCEPT = 1 << 4;
/**
* 测试此键的通道是否已准备好读取。
*/
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}
/**
* 测试此键的通道是否已准备好写入。
*/
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}
/**
* 测试此键的通道是否已完成或未能完成其套接字连接操作。
*/
public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}
/**
* 测试此键的通道是否已准备好接受新的套接字连接。
*/
public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}
// -- Attachments --
private volatile Object attachment = null;
private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
SelectionKey.class, Object.class, "attachment"
);
/**
* 将给定对象附加到此键上。
*/
public final Object attach(Object ob) {
return attachmentUpdater.getAndSet(this, ob);
}
/**
* 返回此键上当前的附加对象
*/
public final Object attachment() {
return attachment;
}
}
选择键 SelectionKey
的功能并不复杂,主要是和通道channel
的IO
事件有关,分为四种:
OP_READ = 1 << 0
读事件。OP_WRITE = 1 << 2
写事件。OP_CONNECT = 1 << 3
连接事件。OP_ACCEPT = 1 << 4
接收事件。
1.3 选择器 Selector
public abstract class Selector implements Closeable {
/**
* 初始化该类的新实例。
*/
protected Selector() { }
/**
* 静态方法创建一个选择器 Selector
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
/**
* 返回这个选择器是否开启。
*/
public abstract boolean isOpen();
/**
* 返回创建次选择器的 provider
*/
public abstract SelectorProvider provider();
/**
* 返回这个选择器的选择键集合。
*
* 不能直接修改这个返回集合。只有当选择键被取消且其通道被注销后,这个选择键才会从集合中删除。
* 任何修改这个集合的尝试都会导致抛出 UnsupportedOperationException 异常。
*
* 这个选择键集合不是线程安全的。
*/
public abstract Set<SelectionKey> keys();
/**
* 返回选择器的选定键集合,即准备好IO 事件的选择键集合。
*
* 可以从选定键集合中删除选择键,但是不能添加键到选定键集合中,
* 任何向选定键集合添加对象的尝试都会导致抛出 UnsupportedOperationException 异常。
*
* 这个选定键集合也不是线程安全的。
*/
public abstract Set<SelectionKey> selectedKeys();
/**
* 立即返回当前选择器中已准备好 IO 事件通道channel 的数量。
* 如果此时选择器中没有准备好IO 事件的通道 channel,
* 那么该方法直接返回 0,不会阻塞当前线程。
*
* 当返回值大于 0 的时候,调用 selectedKeys() 方法,
* 获取已准备好 IO 事件的选择键集合。
*/
public abstract int selectNow() throws IOException;
/**
* 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
* 如果此时选择器中没有准备好IO 事件的通道channel,
* 那么该方法将阻塞当前线程,直到
* 1. 有准备好IO 事件的通道channel
* 2. 使用 Selector.wakeup 唤醒
* 3. 阻塞线程被中断
* 4. 超时时间 timeout 到了
*
* 当返回值大于 0 的时候,调用 selectedKeys() 方法,
* 获取已准备好 IO 事件的选择键集合。
*/
public abstract int select(long timeout)
throws IOException;
/**
* 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
* 如果此时选择器中没有准备好IO 事件的通道channel,
* 那么该方法将阻塞当前线程,直到
* 1. 有准备好IO 事件的通道channel
* 2. 使用 Selector.wakeup 唤醒
* 3. 阻塞线程被中断
*
* 当返回值大于 0 的时候,调用 selectedKeys() 方法,
* 获取已准备好 IO 事件的选择键集合。
*/
public abstract int select() throws IOException;
/**
* 唤醒被 select() 和 select(long timeout) 阻塞的等待线程
*/
public abstract Selector wakeup();
/**
* 关闭当前选择器
*/
public abstract void close() throws IOException;
}
选择器的主要功能就是获取已经准备好的通道 channel
。
1.3.1 获取准备好的通道数量
一共有三个方法 int selectNow()
,int select(long timeout)
和int select()
。
int selectNow()
是立即返回当前准备好的通道数量,如果没有,那就返回0
,不阻塞当前线程。
int select(long timeout)
和int select()
都会阻塞当前线程,直到有准备好的通道,或者阻塞线程被中断,或者其他线程调用选择器的wakeup
方法唤醒。int select(long timeout)
方法还多了一个超时返回。
1.3.2 获取准备好的通道集合
Set<SelectionKey> selectedKeys()
方法返回准备好的选择键集合,通过选择键就可以得到对应的通道 channel
。
1.3.3 唤醒阻塞
Selector wakeup()
可以唤醒被 select()
和 select(long timeout)
阻塞的等待线程。
二. NioEventLoop
中的选择器
2.1 开启选择器
NioEventLoop
的成员变量中有两个选择器实例 unwrappedSelector
和 selector
那是因为
netty
可能会优化选择器的选择键SelectionKey
, 所以就有了两个选择器。
-
unwrappedSelector
通过provider.openSelector()
方法获取的原始选择器。 -
selector
:如果不优化选择键SelectionKey
,selector
就是unwrappedSelector
对象;如果优化选择键,那么selector
就是new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)
对象,包装了unwrappedSelector
对象,并优化了选择键。
这些都是在
openSelector()
方法中实现。
2.2 将通道注册到选择器
-
register(...)
方法/** * 向这个事件循环器的 Selector 注册一个任意的 SelectableChannel (不一定是由Netty创建的)。 * 一旦注册了指定的 SelectableChannel,当 SelectableChannel 准备好时,该事件循环器将执行指定的任务。 */ public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { ObjectUtil.checkNotNull(ch, "ch"); if (interestOps == 0) { throw new IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { throw new IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } ObjectUtil.checkNotNull(task, "task"); if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } if (inEventLoop()) { // 在事件轮询器线程中,直接调用 register0 方法注册 register0(ch, interestOps, task); } else { try { // 调用 submit 方法,确保在轮询器线程中注册 submit(new Runnable() { @Override public void run() { register0(ch, interestOps, task); } }).sync(); } catch (InterruptedException ignore) { // 即使被中断了,我们也会调度它,所以只需将线程标记为中断。 Thread.currentThread().interrupt(); } } }
确保在轮询器线程中注册
-
register0(...)
方法private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) { try { // 将通道 ch 注册到选择器 unwrappedSelector 中 ch.register(unwrappedSelector, interestOps, task); } catch (Exception e) { throw new EventLoopException("failed to register a channel", e); } }
调用
SelectableChannel
的register
方法,将通道注册到选择器unwrappedSelector
中去。
2.3 获取准备好的通道数量
- 立即获取,不阻塞
/** * 返回注册在该 Selector 上的已经准备好进行I/O操作的通道 channel 的数量。 * 如果还没有准备好的通道 channel ,那么直接返回 0,不会阻塞当前线程。 */ int selectNow() throws IOException { return selector.selectNow(); }
- 阻塞获取
private int select(long deadlineNanos) throws IOException { // 如果截止时间 deadlineNanos 是NONE(无限大) // 那么就使用 selector.select() 方法,不设置超时, // 一直阻塞等待,直到有注册在该 selector 上通道 channel 已经准备好进行I/O操作, // 才停止阻塞,返回准备好I/O操作 channel 的数量。 if (deadlineNanos == NONE) { return selector.select(); } // 计算调用 selector.select(timeoutMillis) 的超时阻塞等待时间。 // 如果截止时间在5微秒内,超时时间将为0 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
- 再次获取
/** * 重新获取 IO 事件,即再次调用 selector.selectNow(), 不阻塞线程 */ private void selectAgain() { needsToSelectAgain = false; try { selector.selectNow(); } catch (Throwable t) { logger.warn("Failed to update SelectionKeys.", t); } }
三. 事件轮询
3.1 run
方法
事件轮询器如何实现事件轮询的,就是主要看它的 run
方法实现:
@Override
protected void run() {
int selectCnt = 0;
// 必须使用死循环不断进行事件轮询,获取任务和通道的 IO 事件
for (;;) {
try {
int strategy;
try {
/**
* 返回处理策略,就分为两种:
* 有任务 hasTasks() == true,就不能等待IO事件了,先直接调用 selectNow() 方法,
* 获取当前准备好IO 的通道channel 的数量(0 表示一个都没有),处理 IO 事件 和任务。
*
* 没有任务 hasTasks() == false,返回 SelectStrategy.SELECT (是负数),
* 没有要及时处理的任务,先阻塞等待 IO 事件
*/
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
// 返回下一个计划任务准备运行的截止时间纳秒值
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// 返回 -1,说明没有下一个计划任务,
// 将 curDeadlineNanos 设置为 NONE,
// 调用 selector.select 方法时,就没有超时,
// 要无限等待了,除非被唤醒或者有准备好的 IO 事件。
curDeadlineNanos = NONE;
}
// 设置 超时等待时间
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 当前没有任务,那么就通过 selector 查看有没有 IO 事件
// 并设置超时时间,超时时间到了那么就要执行计划任务了
// 如果 curDeadlineNanos 是 NONE,就没有超时,无限等待。
strategy = select(curDeadlineNanos);
}
} finally {
// 这个更新只是为了帮助阻止不必要的选择器唤醒,
// 所以使用lazySet是可以的(没有竞争条件)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// 如果我们在这里接收到IOException,那是因为Selector搞错了。
// 让我们重新构建选择器并重试。
// https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
/**
* 代码走到这里,
* 要么有 IO 事件,即 strategy >0
* 要么就是有任务要运行。
* 如果两个都不是,那么就有可能是 JDK 的 epoll 的空轮询 BUG
*/
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
// 如果 ioRatio
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// 确保运行了所有待执行任务,包括当前时间已经过期的计划任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
// strategy > 0 说明有 IO 事件,
// 那么需要调用 processSelectedKeys() 方法,执行 IO 时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 计算 IO 操作花费的时间
final long ioTime = System.nanoTime() - ioStartTime;
// 按照比例计算可以运行任务的超时时间 ioTime * (100 - ioRatio) / ioRatio,
// 超时时间到了,即使还有任务没有运行,也直接返回了,等下一个周期在运行这些任务
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// strategy == 0 说明没有 IO 事件,不用处理 IO 了
// 调用 runAllTasks(0) 方法,超时时间为0,这将运行最小数量的任务
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
// 要么有任务运行,要么有 IO 事件处理
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
// 即没有任务运行,也没有IO 事件处理,就有可能是 JDK 的 epoll 的空轮询 BUG
// 调用 unexpectedSelectorWakeup(selectCnt) 方法处理。
// 可能会重新建立 Select
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
// 如果事件轮询器开始 shutdown,就要关闭 IO 资源
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
我们知道事件轮询器要处理两种事件:通道的 IO
事件 和 任务(包括计划任务和待执行任务),那么就要合理分配时间:
-
选择策略
selectStrategy
,根据有无待执行任务(hasTasks()
) 进行划分:- 有待执行任务,那么不用等待
IO
事件,先直接调用selectNow()
方法,获取当前准备好IO
事件的通道channel
,然后处理IO
事件和待执行任务。 - 没有待执行任务,那么就需要阻塞等待准备好
IO
事件的通道;先获取下一个计划任务的截止时间curDeadlineNanos
,调用select(curDeadlineNanos)
方法。
- 有待执行任务,那么不用等待
-
开始处理
IO
事件和待执行任务这里面有一个非常重要的属性
ioRatio
,它表示事件循环中,处理IO
事件所占的时间比例。这个值越大,处理非IO
事件(待执行任务)的时间就越少;但是如果值变成100
,就会禁用该特性,事件循环将不会尝试平衡IO
事件和非I/O事件的时间。-
ioRatio == 100
,如果有IO
事件(strategy > 0
),通过processSelectedKeys()
处理IO
事件,最后调用runAllTasks()
运行全部的待执行任务。 - 然后判断
IO
事件(strategy > 0
),通过processSelectedKeys()
处理IO
事件,计算处理IO
事件的花费的时间ioTime
,根据这个时间,计算出执行非IO
事件(即待执行任务)最多花费的时间ioTime * (100 - ioRatio) / ioRatio
,这样就可以控制运行任务的时间了。 - 没有
IO
事件(strategy == 0
),调用runAllTasks(0)
方法,运行最小数量的任务。 - 最后需要考虑,如果这次被唤醒,即没有任务运行,也没有
IO
事件处理,那么就有可能是JDK
的epoll
的空轮询BUG
;需要重新注册选择器。
-
3.2 处理 IO
事件
-
processSelectedKeys
/** * 处理 IO 事件 */ private void processSelectedKeys() { if (selectedKeys != null) { // 如果是优化的 Select, 调用 processSelectedKeysOptimized 方法 processSelectedKeysOptimized(); } else { // 如果没有优化, // 直接调用 selector.selectedKeys() 获取IO事件的channel processSelectedKeysPlain(selector.selectedKeys()); } }
-
processSelectedKeysOptimized
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // 将数组中的条目空出来,以便在通道关闭后对其进行GC // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
-
processSelectedKeysPlain
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // 检查集合是否为空,如果为空,则直接返回。 // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } // 得到 通道channel IO事件 SelectionKey 的迭代器 Iterator<SelectionKey> i = selectedKeys.iterator(); // 循环遍历, 这里使用 for 死循环, // 因为可能会再次调用 selector.selectNow() 获取IO 事件 // 需要继续处理这些 IO 事件 for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); // 必须调用 remove() 方法, // 将这个 SelectionKey 从迭代器中移除 i.remove(); if (a instanceof AbstractNioChannel) { // 如果 AbstractNioChannel 事件 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; // 如果 NioTask 事件 processSelectedKey(k, task); } // 没有 IO 事件了,跳出循环 if (!i.hasNext()) { break; } if (needsToSelectAgain) { // 如果needsToSelectAgain == true, 需要重新获取IO事件, selectAgain(); // 再次获取 IO 事件的 selectedKeys selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { // 没有新的 IO 事件,就直接返回。 break; } else { // 重新获取新的迭代器, 以避免ConcurrentModificationException i = selectedKeys.iterator(); } } } }
-
processSelectedKey
/** * 处理通道 AbstractNioChannel 的IO事件 */ private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 如果 SelectionKey 无效 if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // 如果通道实现因为没有事件循环器而抛出异常,则忽略此异常, // 因为我们只是试图确定ch是否注册到该事件循环器,从而有权关闭ch。 return; } // 只有当ch仍然注册到这个EventLoop时才关闭ch。 // ch可能已经从事件循环中注销,因此SelectionKey可以作为注销过程的一部分被取消, // 但通道仍然健康,不应该关闭。 // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // 关闭这个通道 channel unsafe.close(unsafe.voidPromise()); } return; } try { // 获取 IO 事件类型 int readyOps = k.readyOps(); // 首先判断是不是连接的IO事件 OP_CONNECT // 在尝试触发read(…)或write(…)之前, // 我们首先需要调用finishConnect(), // 否则NIO JDK通道实现可能抛出 NotYetConnectedException 异常。 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); // 删除OP_CONNECT,否则Selector.select(..)将始终返回而不阻塞 ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 首先处理写事件 OP_WRITE,因为我们可以写一些队列缓冲区,从而释放内存。 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 调用forceFlush,即使没有东西可写,它也会清除OP_WRITE ch.unsafe().forceFlush(); } // 最后处理读事件 // 还要检查 readOps 是否为0,以解决可能导致旋转循环的JDK错误 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } /** * 处理 NioTask 任务, */ private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) { int state = 0; try { task.channelReady(k.channel(), k); state = 1; } catch (Exception e) { k.cancel(); invokeChannelUnregistered(task, k, e); state = 2; } finally { switch (state) { case 0: k.cancel(); invokeChannelUnregistered(task, k, null); break; case 1: if (!k.isValid()) { // Cancelled by channelReady() invokeChannelUnregistered(task, k, null); } break; default: break; } } }
四. 总结
NioEventLoop
基本逻辑已经说清楚了,我们知道它是如何平衡处理 IO
事件和 待执行的任务的。
网友评论