首先我们用无参构造方法创建NioEventLoopGroup
的实例
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
1. NioEventLoopGroup源码
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
通过上面的一系列的构造方法的调用,可以看出
nThreads
=0
executor
=null
selectorProvider
=SelectorProvider.provider()
selectStrategyFactory
=DefaultSelectStrategyFactory.INSTANCE
ejectedExecutionHandlers.reject()
最后传入上面的参数,调用其父类MultithreadEventLoopGroup
的构造方法
2. MultithreadEventLoopGroup源码
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
从这个构造方法中,进行了对
nThreads
的三目运算,因为nThreads
=0
,所以nThreads
被赋值为DEFAULT_EVENT_LOOP_THREADS
,我们来看看这个值多少呢?
具体的赋值语句是
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))
NettyRuntime.availableProcessors() * 2 的结果是本机处理器核数的两倍,在默认情况下,系统中不存在以"io.netty.eventLoopThreads"
为key的键值对。所以最终结果为核数的两倍。(如果手动设置"io.netty.eventLoopThreads"的值,这最后的结果是取设置的值和1中最大的值)
下一个该注意的点:这里构造方法通过一个可变长的参数,将
selectorProvider
selectStrategyFactory
ejectedExecutionHandlers.reject()
都转成了Object
类型
注意:这里是一个重点,NioEventLoopGroup
默认初始化多少线程数:处理器核数的两倍
我们再接着看调用的父类的构造方法
3. MultithreadEventExecutorGroup源码(这段代码较长,详细分析就直接在源码中了)
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
/*
nThreads = 两倍的处理器核数
executor = null
selectorProvider = SelectorProvider.provider()
selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE
ejectedExecutionHandlers.reject()
*/
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// nThreads 大于等于1
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// executor 等于 null
if (executor == null) {
// 下面为executor进行初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// children初始化
children = new EventExecutor[nThreads];
// 通过for循环为children赋值
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 这里我们来看看,newChild方法做了什么,在本类中,该方法是抽象的,实现类为NioEventLoopGroup
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 这里生成了一个EventExecutorChooser 实例,我看看newChooser方法吧
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected ThreadFactory newDefaultThreadFactory() {
// 将本类的Class对象传入
return new DefaultThreadFactory(getClass());
}
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
DefaultThreadFactory源码
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
/*
poolName = "multithreadEventExecutorGroup"
daemon = false
priority = Thread.NORM_PRIORITY (也就是5,一般等级)
threadGroup线程组对象
*/
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
// 下面就是一些简单的赋值操作了,设置好了这个线程工厂的线程前缀名称,域,优先级,线程组
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
public static String toPoolName(Class<?> poolType) {
// poolType = MultithreadEventExecutorGroup.class
if (poolType == null) {
throw new NullPointerException("poolType");
}
// poolName为"MultithreadEventExecutorGroup",字符串长度大于1,所以直接进入第三个判断分支
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
// 通过这段代码,主要是将"MultithreadEventExecutorGroup"第一个字母转成小写,所以最后的返回为"multithreadEventExecutorGroup"
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
StringUtil源码
private static final char PACKAGE_SEPARATOR_CHAR = '.';
public static String simpleClassName(Class<?> clazz) {
/*
clazz = MultithreadEventExecutorGroup.class
所以className = "io.netty.util.concurrent.MultithreadEventExecutorGroup"
最后返回的 className.substring(lastDotIdx + 1)为
"MultithreadEventExecutorGroup"
*/
String className = checkNotNull(clazz, "clazz").getName();
final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR);
if (lastDotIdx > -1) {
return className.substring(lastDotIdx + 1);
}
return className;
}
ThreadPerTaskExecutor源码
// 一个线程工厂,可以直接通过execute执行任务
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
// DefaultThreadFactory
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
NioEventLoopGroup源码
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
/*
executor 为 ThreadPerTaskExecutor的实例
args[0] = SelectorProvider.provider()
args[1] = DefaultSelectStrategyFactory.INSTANCE
args[2] = ejectedExecutionHandlers.reject()
*/
// 这里创建了一个NioEventLoop实例
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop源码
public final class NioEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// Workaround for JDK NIO bug.
//
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static {
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 这里我们可以看到,这个是java.nio.channels.Selector,其实我们注册的ClannelHandler,就是注册在这里
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
DefaultEventExecutorChooserFactory源码
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
就不再深入下去了,本文就到这里吧。
网友评论