NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
进来看看构造函数:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor)null);
}
好吧,直接看最根本的:
public NioEventLoopGroup(int nThreads) {
super(nThreads, (Executor)null, new Object[]{SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject()});
}
现在进入父类:MultithreadEventLoopGroup:
// 获取“io.netty.eventLoopThreads”的值,没有的话,就设置成可用的处理器数量的2倍
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 没有设置线程数的话,就是默认为可用的处理器数量的2倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
再深入一步,进入MultithreadEventExecutorGroup:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
好吧,最终的实现在这里:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
// 用来创建终止监听器(不是重点)
this.terminatedChildren = new AtomicInteger();
// 用来处理终止事件(不是重点)
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 保证nThreads参数是合理的
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
} else {
if (executor == null) {
// 注意这个
// 创建一个任务执行器,因为executor确实就是null,所以会调用这一步。
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
// 初始化EventExecutor数组长度,也就是说,NioEventLoopGroup会创建一个指定线程个数的EventExecutor数组,不难猜想,一个线程对应数组中的一个EventExecutor
this.children = new EventExecutor[nThreads];
int j;
// 下面就是给EventExecutor数组的每一个位置放上实例对象
for(int i = 0; i < nThreads; ++i) {
boolean success = false;
boolean var18 = false;
try {
var18 = true;
// 注意这个
// 这里就是给每一个索引位置赋值
this.children[i] = this.newChild((Executor)executor, args);
success = true;
var18 = false;
} catch (Exception var19) {
throw new IllegalStateException("failed to create a child event loop", var19);
} finally {
if (var18) {
if (!success) {
int j;
// 失败的话,就挨个“优雅关闭”
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
// 还要挨个检查一下是否所有的EventExecutor里面的任务都已经完成
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
// 如果没有完成
while(!e.isTerminated()) {
// 等待所有任务完成
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var20) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 下面这一段的逻辑和上面一样,也是判断创建失败的情况
if (!success) {
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var22) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 好了,EventExecutor数组准备好了,现在把它装在一个分发器上面,分发规则由this.chooser来决定
this.chooser = chooserFactory.newChooser(this.children);
// 这个很明显,创建了一个监听器
FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
// 这个很好理解,就是看terminatedChildren增长后的位置是否和数组的长度一致,说白了,就是判断是否终止
if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
}
}
};
EventExecutor[] var24 = this.children;
j = var24.length;
// 把上面创建的监听器给每一个EventExecutor,监听终止事件
for(int var26 = 0; var26 < j; ++var26) {
EventExecutor e = var24[var26];
e.terminationFuture().addListener(terminationListener);
}
// 把EventExecutor数组放进一个不可修改的集合中
Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
Collections.addAll(childrenSet, this.children);
this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
关于上面的一个chooser:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() {
}
// 根据EventExecutor的数量来决定选用哪种分发器,
// 2的n次方就用PowerOfTwoEventExecutorChooser
// 否则就用GenericEventExecutorChooser
public EventExecutorChooser newChooser(EventExecutor[] executors) {
return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors));
}
// 判断是不是2的n次方
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
// 主要是这个方法,不是2的n次方就通过取模的方式获取下一个NioEventLoop
public EventExecutor next() {
return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
// 2的n次方,就通过相与,相对比取模效率高,建议使用的时候数量设置为2的n次方
public EventExecutor next() {
return this.executors[this.idx.getAndIncrement() & this.executors.length - 1];
}
}
}
好吧,总结一下,new NioEventLoopGroup(n)就是创建了一个n个元素的EventExecutor数组。大概就是这样:
image-20200327230759313.png
EventExecutor是个啥呢?
咱们先来看看ThreadPerTaskExecutor是个啥:
// 利用ThreadFactory创建一个ThreadPerTaskExecutor
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
// 正儿八经地执行还是交给了threadFactory创建出来的线程,说白了,就是“外包”给ThreadFactory去创建线程执行
public void execute(Runnable command) {
this.threadFactory.newThread(command).start();
}
threadFactory就是这个DefaultThreadFactory的实例对象:
public Thread newThread(Runnable r) {
// 这里就是对jdk原生的Thread做了一层包装,反正还是一个Thread
Thread t = this.newThread(FastThreadLocalRunnable.wrap(r), this.prefix + this.nextId.incrementAndGet());
try {
if (t.isDaemon() != this.daemon) {
t.setDaemon(this.daemon);
}
if (t.getPriority() != this.priority) {
t.setPriority(this.priority);
}
} catch (Exception var4) {
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(this.threadGroup, r, name);
}
所以说,这个ThreadPerTaskExecutor就是一个专门执行任务的执行器
现在,咱们可以回到如何创建EventExecutor了:
this.children[i] = this.newChild((Executor)executor, args);
在MultithreadEventExecutorGroup中,newChild方法是一个抽象方法:
protected abstract EventExecutor newChild(Executor var1, Object... var2) throws Exception;
所以咱们还是要回到子类NioEventLoopGroup上:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// 如果args里面有四个参数,那么最后一个参数就是EventLoopTaskQueueFactory,否则,就没有设置EventLoopTaskQueueFactory。根据上下文可以知道,args只有三个元素,所以queueFactory=null
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null;
// 调用构造函数创建一个NioEventLoop,说明NioEventLoop是EventExecutor的子类
return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory);
}
进一步:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
// 这个可以不看,断言后赋值
this.provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// 这个可以不看,断言后赋值
this.selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy");
//需要注意,这是优化点
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
// 这是优化后的Selector
this.selector = selectorTuple.selector;
// 这是jdk的Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
进父类构造函数看看:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 这个tailTasks=null
this.tailTasks = (Queue)ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
再进一步:
// 这就是一个线程池,它的作用就是从任务队列中不断地取出任务执行。详细分析可了解线程池的工作原理
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.threadLock = new CountDownLatch(1);
this.shutdownHooks = new LinkedHashSet();
this.state = 1;
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
至此,我们可以得出NioEventLoop就是一个干活的线程池,但是它又不是一个普通的线程池,它是一个专门处理监听selector事件的线程池。
image-20200327222418277.png
也就是说,new NioEventLoopGroup(2)就是创建了一个线程池数组,里面有两个线程池。
下面来看一下优化点:
private NioEventLoop.SelectorTuple openSelector() {
final AbstractSelector unwrappedSelector;
try {
unwrappedSelector = this.provider.openSelector();
} catch (IOException var7) {
throw new ChannelException("failed to open a new selector", var7);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new NioEventLoop.SelectorTuple(unwrappedSelector);
} else {
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
} catch (Throwable var2) {
return var2;
}
}
});
if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 下面是通过反射修改原生selector中的两个属性值:selectedKeys和publicSelectedKeys
// 将本来是HashSet的数据结构,修改成了数组
// 目的就是当io事件变多时,HashSet数据量会变大,也意味着更容易产生hash冲突导致产生链表
// 链表的add操作的时间复杂度相对于数组的操作来说,就不是一倍两倍了
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1L && publicSelectedKeysFieldOffset != -1L) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
} else {
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
} else {
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
}
}
} catch (NoSuchFieldException var7) {
return var7;
} catch (IllegalAccessException var8) {
return var8;
}
}
});
if (maybeException instanceof Exception) {
this.selectedKeys = null;
Exception e = (Exception)maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new NioEventLoop.SelectorTuple(unwrappedSelector);
} else {
this.selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new NioEventLoop.SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
} else {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable)maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new NioEventLoop.SelectorTuple(unwrappedSelector);
}
}
}
看看优化的本质,在SelectedSelectionKeySet中:
// 时间复杂度就是O(1),常数级别
public boolean add(SelectionKey o) {
if (o == null) {
return false;
} else {
// 直接后面追加
this.keys[this.size++] = o;
if (this.size == this.keys.length) {
// 扩容
this.increaseCapacity();
}
return true;
}
}
private void increaseCapacity() {
// 不够就翻倍
SelectionKey[] newKeys = new SelectionKey[this.keys.length << 1];
System.arraycopy(this.keys, 0, newKeys, 0, this.size);
this.keys = newKeys;
}
以上就是我对于new NioEventLoopGroup()操作的简单分析。
网友评论