EventLoopGroup workerGroup = new NioEventLoopGroup();
### io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
public NioEventLoopGroup() {
### io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
### MultithreadEventLoopGroup.java:39
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
### 表示代码出自的类及方法名,因为他们的相关性很大,所以我就放到同一个代码块中,防止思维跳跃太大,大家跟不上节奏。虽然现在初始化的时候设置了线程数为0,但是并不是最后的结果,经过了诸多的构造函数的调用及父类构造函数的引用,在这里做了一个转化,当为0时,会取DEFAULT_EVENT_LOOP_THREADS的值,而他的值,他取的是,如果设置io.netty.eventLoopThreads的值就取这个值,没有设置的话,会取默认值可用核数的两倍,同1比较去一个最大的进行赋值。最后我们到达了最好的构造函数
### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
* Create a new instance.
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
if (executor == null) { //如何executor为空,那么设置默认执行器为ThreadPerTaskExecutor
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
children = new EventExecutor[nThreads]; //MultithreadEventExecutorGroup是一个总的管理的类,具体和线程相关的都交给他的children进行处理,是一个EventExecutor的数组
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args); //初始化每一个EventExecutor的实例,下面会有详细解释
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
chooser = chooserFactory.newChooser(children); //创建线程的选择器,选择是有哪个线程来处理
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
for (EventExecutor e: children) {
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet); //将这些子处理器设置为只读,不能添加
### io.netty.channel.nio.NioEventLoopGroup#newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
### io.netty.channel.nio.NioEventLoop#NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
if (strategy == null) {
throw new NullPointerException("selectStrategy");
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
* Create a new instance
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks); //设置任务的处理队列,后续任务会添加到这里面
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");