NioEventLoopGroup
是一个可处理I/O操作的多线程事件循环。Netty提供了多种EventLoopGroup
的实现用于不同类型的传输。
在之前的例子中,实现了简单的客户端应用,并且使用了两个NioEventLoopGroup
,通常第一个叫做boss
接受收到的连接请求。第二个通常叫做worker
,只要boss
接受了连接请求并将注册连接请求到worker
,它就会处理接收连接的传输。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
如果不指定线程数量,默认值是0
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
当默认为0的时候,会创建多少线程呢?
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
可以看到DEFAULT_EVENT_LOOP_THREADS
,开辟的线程数量是默认的cpu处理器数量 x 2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
生成处理任务的多线程执行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
然后查看ThreadPerTaskExecutor
类
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
可以看到使涉及到了两个类Executor
和ThreadFactory
,ThreadPerTaskExecutor
用于在一个新的线程,非调用者线程去执行任务。
/**
An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.
The simplest implementation of this interface is just:
class SimpleThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
The Executors.defaultThreadFactory method provides a more useful simple implementation, that sets the created thread context to known values before returning it.
*/
public interface ThreadFactory {
Thread newThread(Runnable r);
}
这里DefaultThreadFactory
是ThreadFactory
的实现
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
接下来的方法,children = new EventExecutor[nThreads];
private final EventExecutor[] children;
而EventExecutor
和EventExecutorGourp
的描述和继承关系如下
/**
The EventExecutor is a special EventExecutorGroup
which comes with some handy methods to see if a Thread is executed in a event loop. Besides this, it also extends the EventExecutorGroup to allow for a generic way to access methods.
*/
public interface EventExecutor extends EventExecutorGroup {
}
/**
* The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
* via its {@link #next()} method. Besides this, it is also responsible for handling their
* life-cycle and allows shutting them down in a global fashion.
*
*/
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
}
EventExecutorGroup
负责提供EventExecutor
通过使用它的next()
方法。除此之外,它还负责处理它们的生命周期,并允许在全局范围内关闭它们.
接下来,根据线程数循环添加NioEventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
children[i] = newChild(executor, args);
}
而方法newChild()
方法的实现如下:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
/*
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
*/
for循环执行结束之后,再看children
这个对象
接下来代码会执行到
chooser = chooserFactory.newChooser(children);
会进入EventExecutorChooserFactory
的实现类DefaultEventExecutorChooserFactory
执行此方法
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
返回一个实现了EventExecutorChooser
的类PowerOfTwoEventExecutorChooser
,其next()
方法中,使用轮询的方式选取下一个EventExecutor
Listens to the result of a Future. The result of the asynchronous operation is notified once this listener is added by calling Future.addListener(GenericFutureListener).
最后,添加监听器,以异步处理回调结果
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
将children内的所有元素添加到一个readonlyChildren
中
/**
* private final Set<EventExecutor> readonlyChildren;
*/
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
网友评论