- Netty系列(一):NioEventLoopGroup源码解析
- Netty 源码解析 ——— NioEventLoop 详解
- Netty 源码解析 ——— ChannelConfig 和 A
- Netty 源码解析 ——— 服务端启动流程 (下)
- Netty 源码解析 ——— 服务端启动流程 (上)
- Netty 源码解析 ——— 基于 NIO 网络传输模式的 OP
- Netty 源码解析 ——— Netty 优雅关闭流程
- Netty 源码解析 ——— AdaptiveRecvByteB
- Netty 源码解析 ——— writeAndFlush流程分析
- Netty 源码解析 ——— 基于 NIO 网络传输模式的 OP
前言
对于NioEventLoopGroup
这个对象,在我的理解里面它就和ThreadGroup
类似,NioEventLoopGroup
中有一堆NioEventLoop
小弟,ThreadGroup
中有一堆Thread
小弟,真正意义上干活的都是NioEventLoop
和Thread
这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。
NioEventLoopGroup
这里咱们可以从NioEventLoopGroup
最简单的无参构造函数开始。
public NioEventLoopGroup() {
this(0);
}
一步步往下走,可以发现最终调用到构造函数:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
参数说明:
- nThreads:在整个方法链的调用过程中,其值到这里为止一直为0,在没有主动配置的情况下后面会进行设置。若配置
io.netty.eventLoopThreads
系统环境变量,则优先考虑,否则设置成为CPU核心数*2
。 - executor: 到目前为止是
null
。 - selectorProvider: 这里为JDK的默认实现
SelectorProvider.provider()
。 - selectStrategyFactory:这里的值是DefaultSelectStrategyFactory的一个实例
SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
。 - RejectedExecutionHandlers:这里是个拒绝策略,这里默认的实现是队列溢出时抛出
RejectedExecutionException
异常。
MultithreadEventLoopGroup
继续往下面走,调用父类MultithreadEventLoopGroup
中的构造函数:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里可以看到判断nThreads == 0
后就会给其附上一个默认值。继续走,调用父类MultithreadEventExecutorGroup
中的构造方法。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
DefaultEventExecutorChooserFactory
这里有个关注的点,DefaultEventExecutorChooserFactory
。这是一个chooserFactory,用来生产EventExecutorChooser
选择器的。而EventExecutorChooser
的功能是用来选择哪个EventExecutor
去执行咱们的任务。咱们从下面的代码中可以观察到DefaultEventExecutorChooserFactory
一共给咱们提供了两种策略。
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
这里的策略也很简单:
- 如果给的线程数是2^n个,那么选择
PowerOfTwoEventExecutorChooser
这个选择器,因为这样可以采用位运算去获取执行任务的EventExecutor
。
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
-
GenericEventExecutorChooser
选择器,这里采用的是取模的方式去获取执行任务的EventExecutor
。
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
相比而言,位运算的效率要比取模的效率高,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。
干正事
到达最终调用的函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
//创建NioEventLoop失败后进行资源的一些释放
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
// 给每一个成功创建的EventExecutor 绑定一个监听终止事件
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
// 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为ThreadPerTaskExecutor
的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下newChild
这个方法,可以说是上面整个流程中的重点:
newChild
newChild
这个方法在NioEventLoopGroup
中被重写了:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。
继续往下走流程:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里openSelector
方法调用后返回SelectorTuple
实例主要是为了能同时得到包装前后的selector
与unwrappedSelector
。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
这里会有一个taskQueue
队列的初始化(Queue<Runnable> taskQueue
),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法newTaskQueue
在NioEventLoop
中重写了的。具体如下:
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
这里生成的是一个MPSC队列(Multi Producer Single Consumer),这是一个多生产者单消费的无锁队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的maxPendingTasks
值为Integer.MAX_VALUE
。然后最终生成的是MpscUnboundedArrayQueue
这样一个无边界的队列。
这样newChild
这个方法到这里就走完了。
terminationListener
简单介绍下这个环节,在上面的创建NioEventLoopGroup
有个环节是给每个NioEventLoop
儿子绑定一个terminationListener监听事件
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
这个事件的回调方法是:
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
在每一个NioEventLoop
关闭后,就会回调这个方法,然后给NioEventLoopGroup
实例中的terminatedChildren
字段自增1,并与初始化成功的NioEventLoop
的总个数进行比较,如果
terminatedChildren
的值与NioEventLoop
的总个数相等,则调用bossGroup.terminationFuture().get()
方法就不会阻塞,并正常返回null
。
同样,future.channel().closeFuture().sync()
这段代码也将不会阻塞住了,调用sync.get()
也会返回null
。
下面给一段测试代码,完整示例大家可以到我的github中去获取:

上面的代码只是一个简单的测试,后面还有别的发现的话会继续在github中与大家一起分享~
网友评论