上一章介绍NioEventLoop
的实现原理,但是我们在 netty
一般都是直接使用 NioEventLoopGroup
类,直接创建一个事件轮询器组。
- 事件轮询器组
EventExecutorGroup
的实现和事件轮询器EventExecutor
的实现,其实两个分支。- 我们前面几章已经详细讲解了事件轮询器
EventExecutor
的实现逻辑,下面我们讲解事件轮询器组EventExecutorGroup
的实现,它的实现比较简单。
一. AbstractEventExecutorGroup类
/**
* EventExecutorGroup实现的抽象基类。
*/
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
return next().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public Future<?> shutdownGracefully() {
// 优雅关闭 EventExecutorGroup
// 它会关闭所管理的所有 EventExecutor
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public abstract void shutdown();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
return next().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
return next().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
// 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
next().execute(command);
}
}
这个是事件轮询器组EventExecutorGroup
实现的抽象基类。
你会发现它所有处理任务的方法,都是通过
next()
交给它所管理的子事件轮询器EventExecutor
来处理。
二. MultithreadEventExecutorGroup 类
/**
* EventExecutorGroup实现的抽象基类,
* 它可以同时用多个线程处理它们的任务。
*/
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
// 该事件轮询器组所管理的所有事件轮询器
private final EventExecutor[] children;
// 不可修改的事件轮询器集合
private final Set<EventExecutor> readonlyChildren;
// 记录处于终止状态的子事件轮询器的数量
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 子事件轮询器的选择器
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* 创建一个新实例。
* @param nThreads 该事件轮询器组的线程数
* @param threadFactory 线程创建工厂
* @param args 将传递给每个newChild(Executor, Object…)调用的参数
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
/**
* 创建一个新实例。
* @param nThreads 该事件轮询器组的线程数
* @param executor 线程创建工厂
* @param args 将传递给每个newChild(Executor, Object…)调用的参数
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
/**
* 创建一个新实例。
* @param nThreads 该事件轮询器组的线程数
* @param executor
* @param chooserFactory 子事件轮询器的选择器
* @param args 将传递给每个newChild(Executor, Object…)调用的参数
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 根据线程数 nThreads,创建所管理的 EventExecutor 数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 通过 newChild 方法,创建子的 EventExecutor 实例
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果创建失败,需要关闭已经创建的子的 EventExecutor
if (!success) {
for (int j = 0; j < i; j ++) {
// 关闭创建的子事件执行器 EventExecutor
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
// 通过 while 循环,确保子事件执行器 EventExecutor 都已经终止 Terminated
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建子事件执行器 EventExecutor 的选择器 chooser
chooser = chooserFactory.newChooser(children);
// 创建一个子事件执行器终止的监听器
// 当所有子事件执行器都终止后,就表示这个EventExecutorGroup也终止
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
// 当所有子事件执行器都终止,
// 调用 EventExecutorGroup 的setSuccess 方法,通知关闭
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
// 每个子事件执行器都添加监听器
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
// 返回一个包装的不可修改集合
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
@Override
public EventExecutor next() {
// 通过选择器返回下一个子事件轮询器
return chooser.next();
}
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
/**
* 返回此事件轮询器组使用的EventExecutor的数量。
* 这个数字是它所使用的线程的1:1映射。
*/
public final int executorCount() {
return children.length;
}
/**
* 子类必须实现,创建属于自己的事件轮询器 EventExecutor
*/
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
// 通过循环遍历,关闭所管理的所有子事件轮询器 EventExecutor
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated
public void shutdown() {
// 通过循环遍历,关闭所管理的所有子事件轮询器 EventExecutor
for (EventExecutor l: children) {
l.shutdown();
}
}
@Override
public boolean isShuttingDown() {
// 只有当所有子事件轮询器 EventExecutor 都isShuttingDown,返回true
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
// 只有当所有子事件轮询器 EventExecutor 都isShutdown,返回true
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
// 只有当所有子事件轮询器 EventExecutor 都 isTerminated,返回true
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
}
这个 MultithreadEventExecutorGroup
类其实为事件轮询器组EventExecutorGroup
奠定了基础啊。
在它的构造方法中,我们发现:
- 它会为每个线程创建一个事件轮询器
EventExecutor
,即线程和事件轮询器一一对应。 - 它会创建一个子事件轮询器选择器
EventExecutorChooser
实例。 - 提供了
EventExecutor newChild(Executor executor, Object... args)
抽样方法,让子类实现,返回子类对应的事件轮询器类型实例。
三. EventExecutorChooser 类
/**
* 创建新的 EventExecutorChooser 的工厂。
*/
@UnstableApi
public interface EventExecutorChooserFactory {
/**
* Returns a new {@link EventExecutorChooser}.
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* 选择要使用的下一个EventExecutor。
*/
@UnstableApi
interface EventExecutorChooser {
/**
* 返回要使用 EventExecutor。
*/
EventExecutor next();
}
}
/**
* 默认的 EventExecutorChooserFactory 实现
*/
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
// 如果是 2 的幂数,可以使用 & 运算得到余数,效率更快
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 不是 2 的幂数,只能使用 % 运算得到余数
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
// 返回是不是 2 的幂数
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 因为 executors.length 是 2的幂数,可以使用 & 运算得到余数
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 只能使用 % 运算得到余数
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
在
netty
提供的默认实现中,如果子事件轮询器的数量是2
的幂数,那么可能更快一点。
四. MultithreadEventLoopGroup 类
/**
* EventLoopGroup实现的抽象基类,
* 它可以同时用多个线程处理它们的任务。
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
* EventExecutorChooserFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
这个类没有啥可讲解的。
五. NioEventLoopGroup 类
/**
* MultithreadEventLoopGroup 实现,用于基于NIO选择器的通道。
*/
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* 使用默认线程数、默认ThreadFactory 和SelectorProvider (由SelectorProvider.provider()返回)
* 创建一个新实例。
*/
public NioEventLoopGroup() {
this(0);
}
/**
* 使用指定数量的线程、默认ThreadFactory 和 SelectorProvider(由SelectorProvider.provider()返回)
* 创建一个新实例。
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
/**
* 使用默认线程数、给定的ThreadFactory 和SelectorProvider(由SelectorProvider.provider()返回)
* 创建一个新实例。
*/
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
/**
* 使用指定数量的线程、给定的ThreadFactory和SelectorProvider(由SelectorProvider.provider()返回)
* 创建一个新实例。
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
/**
* 使用指定数量的线程、给定的ThreadFactory和给定的SelectorProvider
* 创建一个新实例。
*/
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectorProvider selectorProvider,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
/**
* 设置花在子事件循环中的I/O所需时间的百分比。
* 默认值是50,这意味着事件循环将尝试在I/O上花费与非I/O任务相同的时间。
*/
public void setIoRatio(int ioRatio) {
// 所有管理的子事件轮询器 NioEventLoop,都设置这个 ioRatio
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
/**
* 用新创建的选择器替换子事件循环的当前选择器,
* 以解决臭名昭著的epoll 100% CPU错误。
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
/**
* 创建此事件轮询器组 NioEventLoopGroup 所管理的
* 子事件轮询器 NioEventLoop 实例
*/
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
// 直接 new 出 NioEventLoop 实例
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
}
这个类其实也没有什么好讲的,就是复写
newChild
方法,创建所管理的事件轮询器NioEventLoop
。
六. 总结
你会发现事件轮询器组EventExecutorGroup
的实现非常简单,就是管理子的事件轮询器 EventLoop
。
网友评论