4.4 线程
4.4.1 AbstractExecutorService
AbstractExecutorService是JDK并发包中的类,实现了ExecutorService中的submit()和invoke***()方法,关键实现是其中的newTaskFor()方法,使用FutureTask包装一个Ruannble对象和结果或者一个Callable对象。注意,这个方法是一个protected方法,子类中可以覆盖这个实现。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
4.4.2 AbstractEventExecutor
AbstractEventExecutor继承自AbstractExecutorService并实现了EventExecutor接口,该类中只实现了一些简单的方法:
public EventExecutor next() {
return this;
}
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
next()方法在线程池的讲解中已经接触过,功能是选择线程池中的一个线程,将AbstractEventExecutor看为只有一个线程的线程池,所以next()返回它本身。inEventLoop()和shutdownGracefully()方法都调用它的有参方法,我们将在其子类实现中详细介绍,这里我们先了解其功能即可。inEventLoop()的功能使判断当前线程是否是EventExecutor原生线程,shutdownGracefully()即优雅关闭。
AbstractEventExecutor类中有四个创建异步结果的方法,实现类似如下:
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
AbstractEventExecutor类覆盖了父类的newTaskFor()方法:
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new PromiseTask<T>(this, callable);
}
使用Netty的PromiseTask代替JDK的FutureTask,其中的差别,我们在下一节讲述。此外,还用Netty的Future对象覆盖了subimt()方法的返回值(原本为JDK的Future).
4.4.3 AbstractScheduledEventExecutor
从名字可以看出,AbstractScheduledEventExecutor类是关于Schedule的实现。如果要调度一堆任务,那么首先要有存放任务的容器,Netty中使用队列:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
该调度任务队列是一个优先级队列,并使用了延迟加载。其核心的调度方法实现如下:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task); // 原生线程直接向任务队列添加
} else {
execute(new Runnable() { // 其他线程则提交一个添加调度任务的任务
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
可以看出实现很简单,就是向调度任务队列中添加一个任务,为了弄明白具体的调度过程,我们需要明白ScheduledFutureTask,下面我们将详细介绍。
ScheduledFutureTask
首先看其中的静态字段和静态方法:
// 调度任务ID生成器
private static final AtomicLong nextTaskId = new AtomicLong();
// 调度相对时间起点
private static final long START_TIME = System.nanoTime();
// 获取相对的当前时间
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
// 获取相对的截止时间
static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
注意:Netty使用了相对时间调度,时间起点为ScheduledFutureTask类第一次被类加载器加载的时间。
然后我们看其中的私有字段:
// 调度任务ID
private final long id = nextTaskId.getAndIncrement();
// 调度任务截止时间即到了改时间点任务将被执行
private long deadlineNanos;
// 任务时间间隔
private final long periodNanos;
这里的periodNanos字段还兼有标记的功能,0--表示调度任务不重复,>0--表示按固定频率重复(at fixed rate),<0--表示按固定延迟重复(with fixed delay)。这不是一个好的设计,但也没有暴露给用户程序员,算一个折中处理。
接着我们看关键的run()方法:
@Override
public void run() {
assert executor().inEventLoop();
try {
if (periodNanos == 0) { // 普通不重复的调度任务直接执行
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
} else {
if (!isCancelled()) { // 重复的任务可能被取消
task.call();
if (!executor().isShutdown()) { // 线程已经关闭则不再添加新任务
long p = periodNanos;
if (p > 0) {
deadlineNanos += p; // 按固定频率重复
} else {
deadlineNanos = nanoTime() - p; // 按固定延迟重复
}
if (!isCancelled()) {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this); // 下一个最近的重复任务添加到任务队列
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
代码中的注释很好的解释了一个调度任务的执行过程,可能你会对按固定延迟重复的任务有疑问,即:
deadlineNanos = nanoTime() - p;
其中nanoTime()指当前时间(注意是相对时间),由于p是负值-p等价于:当前时间+delay时间。由于ScheduledFutureTask是添加到PriorityQueue中的对象,我们再看看其中的compareTo()方法:
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
从代码可以看出,优先级队列的出队顺序是:截止时间最近的先出队,如果截止时间相同则ID小的先出队。
分析完ScheduledFutureTask类,我们接着分析AbstractScheduledEventExecutor类中剩下的方法,由于其中的方法实现简单明了,不再列出代码实现,只列出其方法签名:
// 返回当前时间(相对时间)
protected static long nanoTime() {
return ScheduledFutureTask.nanoTime(); // 使用ScheduledFutureTask的相对时间
}
// 取得并移除截止时间大于nanoTime的下一个调度任务
protected final Runnable pollScheduledTask(long nanoTime);
// 取得距离下一个调度任务执行的间隔时间
protected final long nextScheduledTaskNano();
// 取得但并不移除下一个调度任务
final ScheduledFutureTask<?> peekScheduledTask();
// 是否有将要执行的调度任务
protected final boolean hasScheduledTasks();
// 删除一个调度任务
final void removeScheduled(final ScheduledFutureTask<?> task);
4.4.4 SingleThreadEventExecutor
SingleThreadEventExecutor类从名字可以看出,它是一个单线程的Executor实现。在介绍之前,我们先看Netty定义的线程状态:
private static final int ST_NOT_STARTED = 1; // 没有启动
private static final int ST_STARTED = 2; // 启动
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭
private static final int ST_SHUTDOWN = 4; // 关闭
private static final int ST_TERMINATED = 5; // 终止
需要注意的有两点:
(1).本类的实现中线程采用延迟启动(lazy start),只有当提交第一个任务时线程才启动,从而节省资源。
(2).当调用shutdownGracefully()时,线程状态改变为ST_SHUTTING_DOWN;调用shutdown()时,线程状态改变为ST_SHUTDOWN。
明白了线程状态,我们首先看一下类中的字段:
private final EventExecutorGroup parent; // 该Executor所属的线程池
private final Queue<Runnable> taskQueue; // 任务队列
private final Thread thread; // 改Executor所属的线程
private final ThreadProperties threadProperties; // 线程属性值
private final Semaphore threadLock = new Semaphore(0); // 一个信号量,注意初始值为0
private final Set<Runnable> shutdownHooks = new LinkedHashSet<~>(); // 线程关闭钩子任务
private final boolean addTaskWakesUp; // 添加任务时是否唤醒线程
private final int maxPendingTasks; // 任务队列大小即未执行的最大任务数
private final RejectedExecutionHandler rejectedExecutionHandler; // 队列满时的阻止器
private long lastExecutionTime; // 上一次执行时间
private volatile int state = ST_NOT_STARTED; // 线程状态,注意该字段由STATE_UPDATER修改
// 线程终止异步结果
private final Promise<?> terminationFuture = new DefaultPromise<Void>(
GlobalEventExecutor.INSTANCE);
关于SingleThreadEventExecutor的构造方法,我们摘选下面的关键代码:
thread = threadFactory.newThread(() -> {
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // 这是一个模板方法
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// shutdown
}
});
taskQueue = newTaskQueue(); // 这里使用该方法是为了子类可以优化
其中使用了模板方法run(),由子类负责实现。taskQueue也由一个方法实例,主要是给子类提供一个优化的机会,关于Netty的优化,我们以后将专门讲解,这里taskQueue的默认实现是LinkedBlockingQueue。
下面我们分析一个关键方法runAllTasks(long timeoutNanos),其功能是用给定的timeoutNanos时间执行任务队列中的任务,代码如下:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); // 将调度任务队列中到期的任务移到任务队列
Runnable task = pollTask(); // 从任务队列头部取出一个任务
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 执行截止时间
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// 每执行64个任务检查时候时间已到截止时间,0x3F = 64-1
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) { // 没有任务则退出
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 更新上一次执行时间
this.lastExecutionTime = lastExecutionTime;
return true;
}
我们再看一下fetchFromScheduledTaskQueue()方法,它从调度任务队列取出所有到期的调度任务并加入到任务队列,除非任务队列满,代码如下:
private boolean fetchFromScheduledTaskQueue() {
// 等价于ScheduledFutureTask.nanoTime()
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// 任务队列已满,则重新添加到调度任务队列
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
runAllTasks()还有一个无参方法,其功能将所有到期的调度任务从调度任务队列移入任务队列,并执行任务队列中的所有任务(包括非调度任务),我们不再列出代码。
SingleThreadEventExecutor类是一个通用框架,不仅可以执行异步任务,也能执行同步任务,下面我们分析其中用于执行同步任务的关键方法takeTask(),其功能是取出任务队列头部的任务,如果没有任务则会一直阻塞,代码如下:
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) { // 任务队列必须是阻塞队列
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
// 取得调度任务队列的头部任务,注意peek并不移除
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) { // 没有调度任务
Runnable task = null;
try {
task = taskQueue.take(); // 取得并移除任务队列的头部任务,没有则阻塞
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos(); // 调度任务的到期时间间隔
Runnable task = null;
if (delayNanos > 0) {
try { // 调度任务未到期,则从任务队列取一个任务,可能为null
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
return null;
}
}
// 注意这里执行有两种情况:1.任务队列中没有待执行任务,2.调度任务已到期
if (task == null) {
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
特别关注一下15行代码,这里有一个WAKEUP_TASK,它是一个标记任务。使用这个标记任务是为了线程能正确退出,当线程需要关闭是,如果线程在take()方法上阻塞,就需要添加一个标记任务WAKEUP_TASK到任务队列,是线程从take()返回,从而正确关闭线程。
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
// 非本类原生线程或者本类原生线程需要关闭时,添加一个标记任务使线程从take()返回。
// offer失败表明任务队列已有任务,从而线程可以从take()返回故不处理
taskQueue.offer(WAKEUP_TASK);
}
}
本类覆盖了execute()方法,在这里实现了线程的延迟启动(lazy start),代码如下:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) { // 原生线程直接添加
addTask(task);
} else { // 外部线程启动线程后添加
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject(); // 原生线程关闭时则阻止添加,抛出异常
}
}
// 是否唤醒线程,addTaskWakesUp由构造方法配置,wakesUpForTask()可由子类覆盖,默认唤醒
// 这里这个参数值addTaskWakesUp和其说明有出入,现在false反而唤醒?
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Netty线程关闭的代码较为繁琐,我们先不列出,以后专门使用一节讲述。此外,本类中其他需要说明的方法,我们列出方法签名和说明:
// 取得并移除任务队列的头部任务,忽略WAKEUP_TASK标记任务
protected Runnable pollTask();
// 取得任务队列的头部任务
protected Runnable peekTask();
// 任务队列是否有任务即是否为空
protected boolean hasTasks();
// 挂起的任务数即任务队列大小
public int pendingTasks();
// 添加一个任务,线程关闭时抛出异常
protected void addTask(Runnable task);
final boolean offerTask(Runnable task);
// 移除一个任务
protected boolean removeTask(Runnable task);
// 下一个调度任务到期的时间间隔
protected long delayNanos(long currentTimeNanos);
// 判断线程是否为该类的原生线程
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
4.4.5 SingleThreadEventLoop
SingleThreadEventLoop终于与Channel取得联系,其中最重要的便是register()方法,功能是将一个Channel对象注册到EventLoop上,其最终实现委托Channel对象的Unsafe对象完成,关于Unsafe我们将在下一章介绍。其代码实现如下:
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
// 代码中省略了NullPointer检查
channel.unsafe().register(this, promise);
return promise;
}
该类还覆盖了父类的wakesUpForTask(Runnable task)方法,实现如下:
@Override
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}
// 标记接口,用于标记不唤醒原生线程的任务
interface NonWakeupRunnable extends Runnable { }
4.4.6 NioEventLoop
前面铺垫了这么多,终于到了我们的目的地NioEventLoop。NioEventLoop的功能是对注册到其中的Channnel的就绪事件以及对用户提交的任务进行处理,回忆第一章关于Java NIO的讲解,NioEventLoop正是要完成第一章中所示的代码的工作。首先我们从其中的字段开始:
Selector selector; // NIO中的多路复用器Selector
private SelectedSelectionKeySet selectedKeys; // 就绪事件的键值对,优化时使用
private final SelectorProvider provider; // selector的工厂
// 唤醒标记,由于select()方法会阻塞
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy; // 选择策略
private volatile int ioRatio = 50; // IO任务占总任务(IO+普通任务)比例
private int cancelledKeys; // 取消的键数目
private boolean needsToSelectAgain;
在讲解方法前,我们再回顾一下NioEventLoop的继承体系:
(1).JDK的AbstractExecutorService类定义了任务的提交和执行,留下了newTaskFor()方法用于子类定义执行的任务;
(2).Netty的AbstractEventExecutor类覆盖了newTaskFor()方法,使用PromiseTask表示待执行的任务;
(3).AbstractScheduledEventExecutor类将待执行的调度任务封装为ScheduledFutureTask提交给调度任务队列;
(4).SingleThreadEventExecutor类实现了任务执行器即线程,其覆盖了execute()方法,当使用execute()执行一个任务时,实质是向任务队列提交一个任务;该类中还有一个重要的模板方法run(),在这个方法中执行任务队列中的任务(调度任务队列中的待执行任务移入普通任务队列),留给子类实现;
(5).SingleThreadEventLoop类实现对Channel对象的注册。
从NioEventLoop继承体系的分析可以看出,NioEventLoop要实现的最关键方法就是基类的模板方法run()。是不是已经迫不及待了?好,我们直奔代码:
@Override
protected void run() {
for (;;) {
try {
// 调用select()查询是否有就绪的IO事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys(); // 处理就绪的IO事件
runAllTasks(); // 执行完任务队列中的任务
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys(); // 处理就绪的IO事件
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 给定时间内执行任务
}
if (isShuttingDown()) { // 检测用户是否要终止线程
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
try {
Thread.sleep(1000); // 防止连续异常过度消耗CPU
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
从代码中可以看出NioEventLoop完成了三项任务:
(1).轮训Channel选择就绪的IO事件。
(2).处理就绪的IO事件。
(3).处理任务队列中的普通任务(包含调度任务)。
其中第(3)项,我们已经在SingleThreadEventExecutor类中分析过,不再赘述。我们看代码的6-16行即第(1)项,轮询Channel选择就绪的IO事件。这里使用接口SelectStrategy是用户可以选择具体的选择策略,我们主要看默认实现:
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = () -> { return selectNow(); };
故默认策略是:如果有普通任务待执行,使用selectNow();否则使用select(boolean oldWakenUp)。NIO的Selector有三个select()方法,它们的区别如下:
select() 阻塞直到有一个感兴趣的IO事件就绪
select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout
selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪
此外,还有一个重要的wakeUp()方法,其功能是唤醒一个阻塞在select()上的线程,使其继续运行。如果先调用了wakeUp()方法,那么下一个select()操作也会立即返回。此外,wakeUp()是一个昂贵的方法,应尽量减少其调用次数。
有了这些基础知识,我们看本类中与selec()操作有关的方法,首先看selecNow()方法:
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
if (wakenUp.get()) { // wakenUp标记字段为真时,唤醒下一次select()操作
selector.wakeup();
}
}
}
实现也很简单,我们主要看select(boolean oldWakenUp)方法:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// delayNanos返回的是最近的一个调度任务的到期时间,没有调度任务返回1秒
// selectDeadLineNanos指可以进行select操作的截止时间点
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 四舍五入将select操作时间换算为毫秒单位
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { // 时间不足1ms,不再进行select操作
if (selectCnt == 0) { // 如果一次select操作没有进行
selector.selectNow(); // selecNow()之后返回
selectCnt = 1;
}
break;
}
// 此时有任务进入队列且唤醒标志为假
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow(); // selectNow()返回,否则会耽误任务执行
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 有就绪的IO事件,参数oldWakenUp为真,外部设置wakenUp为真
// 有待执行普通任务,有待执行调度任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() ||
hasScheduledTasks()) {
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1; // 截止时间已到(这里可直接break退出)
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector(); // 这里是对JDK BUG的处理
selector = this.selector;
selector.selectNow(); // 重建selector之后立即selectNow()
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
}
}
本来select操作的代码不会这么复杂,主要是由于JDK BUG导致select()方法并不阻塞而直接返回且返回值为0,从而出现空轮询使CPU完全耗尽。Netty解决的办法是:对select返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。这里注意for()循环中大量使用了break,含有break的部分才是关键操作,其他部分(其实就只有一处)是为了解决JDK BUG。
为了完全理解这段代码,我们还将讲解一下wakeUp()方法,注意其中的21行和32行代码。回忆一下SingleThreadEventExecutor的execute()方法,其最后有一个wakeUp()方法,作用是添加一个任务后指示是否需要唤醒线程。在NioEventLoop中覆盖了它的实现:
@Override
protected void wakeup(boolean inEventLoop) {
// 外部线程且唤醒标记为假时唤醒
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup(); // 注意此时唤醒标记为真
}
}
select(wakenUp.getAndSet(false)); // run方法调用时
当run方法调用select()方法时,每次都将唤醒标记设置为假,这样线程将阻塞在selector.select(timeoutMillis)方法上。阻塞期间如果用户使用外部线程提交一个任务,会调用上述的wakeup()方法,由于wakenUp唤醒标记为假,selector.wakeup()方法调用,线程唤醒从下一个break跳出,从而执行提交任务。阻塞期间如果外部线程提交多个任务,使用wakenUp唤醒标记使selector.wakeup()操作只执行一次,因为它是一个昂贵的操作,从而提高性能。21行代码进入if执行的前提是有任务且wakenUp唤醒标记为假,如果唤醒标记为真是什么情况呢?那说明由外部线程调用了selector.wakeup()方法,此时下一个select()操作会直接返回,继而从下一个break返回,所以也不会影响已有任务的执行。在run()方法select之后的操作还有这样两行代码:
if (wakenUp.get()) {
selector.wakeup();
}
根据注释的解释是:在select(wakenUp.getAndSet(false))操作set(false)和selector.select(timeout)之间如果有外部线程将唤醒标记wakenUp设置为真且执行selector.wakeup()方法,则selector.select(timeout)的第一个操作立即返回,然后会阻塞在第二次循环的select.select(timeout)方法上,此时唤醒标记wakenUp为真从而阻止外部线程添加任务时唤醒线程,从而造成不必要的阻塞操作。(但是代码在select(timeout)之后的一行使用了hasTasks()判断,如果外部线程提交了任务也能跳出循环。所以这部分代码和注释是不是已失效?)
分析完select操作之后,我们接着分析Netty对IO事件的处理方法processSelectedKeys():
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip()); // 使用优化
} else {
processSelectedKeysPlain(selector.selectedKeys()); // 普通处理
}
}
关于优化,我们将在专门的章节讲述,我们先看普通处理:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return; // 选择键的集合为空直接返回
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) { // IO事件由Netty框架处理
processSelectedKey(k, (AbstractNioChannel) a);
} else { // IO事件由用户自定义任务处理
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
这一部分代码功能就是遍历选择键,其中对选择键的处理有两种方式:Netty框架处理和用户自定义处理。这两种处理方式由register()方式决定:
// Netty框架处理
public ChannelFuture register(final Channel channel, final ChannelPromise promise);
// 用户自定义处理
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task);
注意23-31行代码,什么时候需要再次执行select()操作呢?当取消的选择键达到一定数目时,这个数目在Netty中时CLEANUP_INTERVAL,值为256。也就是每取消256个选择键,Netty重新执行一个selectAgain()操作。这个操作实现使用selector.selectNow()并将needsToSelectAgain标记设置为假。cancle()代码如下:
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
接着分析最为关键的processSelectedKey()方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { // 选择键不再有效
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
// channel已不再该EventLoop,直接返回
if (eventLoop != this || eventLoop == null) {
return;
}
// channel还在EventLoop,关闭channel
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 客户端连接事件
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); // 连接完成后客户端除了连接事件都感兴趣
unsafe.finishConnect(); // 完成连接
}
// readyOps == 0为对JDK Bug的处理, 防止死循环
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); // 读事件以及服务端的Accept事件都抽象为read()事件
if (!ch.isOpen()) {
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 写事件
ch.unsafe().forceFlush();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
可以看出对IO事件的具体处理,委托给NioUnsafe对象处理,由read()、forceFlush()、finishConnect()和close()方法处理具体的IO事件,具体的处理过程,我们将在分析NioUnsafe时讲解。
目前为止,我们已经讲解完了NioEventLoop实现的最关键部分,当然还有一些细节我们需要完善:
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
NioEventLoop由于不使用takeTask()方法,所以使用一个MPSC队列代替基类的LinkedBlockingQueue作为新的任务队列,大大提高了性能。如果你对MPSC(多个生产者一个消费者)队列感兴趣,可自行查看相关资料。
@Override
public int pendingTasks() {
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow(); // 同步等待结果
}
}
private final Callable<Integer> pendingTasksCallable = () -> {
return NioEventLoop.super.pendingTasks();
};
这一部分代码是使用MPSC队列的副作用,由于MPSC只能由NioEventLoop原生线程访问,否则会发生一些意外情况,所以查询队列大小,也向任务队列提交一个任务同时同步等待结果。
@Override
protected Runnable pollTask() {
Runnable task = super.pollTask();
if (needsToSelectAgain) {
selectAgain();
}
return task;
}
NioEventLoop覆盖了pollTask()的实现,在适当时机执行selector.selectNow()操作。(由于pollTask是在执行普通任务时调用,是否有必要?就算selectNow()有结果也不能处理)
Netty作为一个优化狂魔,将优化做到了极致。回忆处理选择键的事件时,需要遍历其存储容器selectedKeySet,这是一个HashSet,迭代性能不高,那么优化。Netty使用新的SelectedSelectionKeySet代替JDK的HashSet,具体怎么实现的呢?在方法openSelector()中实现,代码不在列出,其思路是:使用反射替换这个容器。
下面我们分析SelectedSelectionKeySet,首先看字段:
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true; // 标记字段,控制使用具体的数组
可以看出SelectedSelectionKeySet使用双数组实现,为什么要这样设计呢?
(1).使用数组提高遍历效率。
(2).遍历时使用一个数组,此时可向另一个数组添加就绪的选择键,防止ConcurrentModificationException异常发生。
再看其中的add()方法:
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false; // 不支持null元素
}
if (isA) {
int size = keysASize;
keysA[size ++] = o; // 就绪的选择键放在末尾
keysASize = size;
if (size == keysA.length) {
doubleCapacityA(); // 双倍扩充容量
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
从代码中可以看出,两个双数组可以视为无限容量且不支持null元素。由于双数组一个用于遍历,一个用于添加新元素,我们看关键的两个数组切换的方法,其实现也很简单,代码如下:
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null; // 最末尾元素显示置为null
keysBSize = 0; // B数组清空,用于添加元素
return keysA; // A数组返回,用于遍历
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
分析完对SelectedKeySet的优化,我们看在NioEventLoop中的使用:
// 返回用于遍历的数组
processSelectedKeysOptimized(selectedKeys.flip());
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break; // 注意SelectedKeySet的实现置最末尾元素为null,故必能跳出
}
selectedKeys[i] = null; // 设置为null,帮助GC进行回收
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a); // Netty框架处理
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task); // 用户自定义处理
}
if (needsToSelectAgain) { // 有必要重新选择
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
// 将上一次遍历集合中未处理元素置null,帮助GC回收,防止泄露
selectedKeys[i] = null;
}
selectAgain(); // 未处理元素也将添加到数组中
selectedKeys = this.selectedKeys.flip(); // 取出遍历数组
i = -1; // 遍历数组索引设置为-1是因为之后将执行i++从而还是从0开始遍历
}
}
}
到了这里,我们已经分析完大部分NioEventLoop的工作原理和实现,但Netty的实现远不止这些,比如全局任务执行器GlobalEventExecutor,默认执行器DefaultEventExecutor,以及其他的ThreadPerChannelEventLoop,LocalEventLoop等等,由于我们很懒,所以不再讲述。我们休整一会,然后前往下一个目的地:Netty的优雅退出机制。
网友评论