NioEventLoopGroup
直接拿官方的EchoServer类做分析
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// workGroup没有传入线程数,则默认CPU*2线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
直接看
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup负责管理一组EventLoop。
boss用来accept客户端连接,worker用来处理客户端数据的读写操作。
那么直接看NioEventLoopGroup的构造方法,它有多个构造方法,最终都会调用如下:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
- nThreads:线程数
- executor:线程执行器,默认是null
- selectorProvider:默认是KQueueSelectorProvider
- selectStrategyFactory:默认DefaultSelectStrategyFactory
- 还有个默认的RejectedExecutionHandlers:拒绝策略
接着调用父类的构造,抽象类MultithreadEventLoopGroup,这个类继承了MutithreadEventExecutorGroup并实现了EventLoopGroup。用于多线程处理任务。
//这里的DEFAULT_EVENT_LOOP_THREADS在类初始化的时候,静态代码块
//中生成Runtime.getRuntime().availableProcessors() * 2
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
DEFAULT_EVENT_LOOP_THREADS默认为处理器数量的两倍。
上述的构造会调用其父类MultithreadEventExecutorGroup的构造方法,下面会设置其两个属性值,children和chooser
- children(private final EventExecutor[] children):保存EventLoop
- chooser(EventExecutorChooserFactory.EventExecutorChooser chooser):选择EventLoop的策略类
//这里又多了个参数
//这里出现一个EventExecutor工厂选择类,默认是
//DefaultEventExecutorChooserFactory,这个工厂使用轮询来选择
//EventExecutor
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
接着调用该类的构造
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//默认的ThreadPerTaskExecutor,每次都新建一个FastThreadLocalThread
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//通过newChild方法来创建EventExecutor,可以通过next()方法访问到。
//每个将会为MultithreadEventExecutorGroup服务的线程都会调用这个方法
//这里调用的是NioEventLoopGroup的newChild方法,方法里面创建了NioEventLoop
//这里 EventLoop的构造,接收了NioEventLoopGroup、Executor、
//SelectorProvider、SelectStrategy和RejectedExecutionHandler
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果不成功就释放资源
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//这里使用的是DefaultEventExecutorChooserFactory,根据executor的长度选择Chooser
//长度是2的幂次方,则选择PowerOfTwoEventExecutorChooser
//如果不是选择GenericEventExecutorChooser,这里长度为1
//选择前者。Chooser里面提供了next方法
chooser = chooserFactory.newChooser(children);
//创建终止监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
再来看看这幅图,看下Netty中的Reactor模式。就是由一个不断等待和循环的进程(线程)来响应IO请求,IO请求就绪后,指派调用指定的handler做处理。
上面定义的boss就是mainReactor,worker就是subReactor。代码后面再做分析。
总结:
- NioEventLoopGroup管理了一组NioEventLoop
- 它指定了线程数量 核心数*2
- 它创建了executor,默认为ThreadPerTaskExecutor
- 它通过newChild方法创建NioEventLoop,NioEventLoop包含了上面创建的executor
网友评论