在编写一个基于netty的网络应用中,代码的整体结构都一样,不管是写一个基于nio的还是Oio的netty应用,但是用的最多的显著是基于nio的。本文及后续的几篇文章通过一个简单的服务端demo来分析下其启动流程,从而更好的理解netty的整体核心组件之间是怎么串联一起工作的。以下面这个例子开始
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully().sync();
childGroup.shutdownGracefully().sync();
}
在上述例子中,首先生成了2个NioEventLoopGroup对象。一个为parentGroup,一个为childGroup。parentGroup负责分配EventLoop处理parent channel在其生命周期过程中发生的事件,parentGroup负责分配EventLoop处理child channel(当parent channel在接受客户端的连接请求后建立的新连接)在其生命周期过程中发生的事件。
本文主要来探讨下对NioEventLoopGroup的初始化。在了解其初始化流程之前,先来看下其构造方法的几个主要参数。
nThreads:设置group的线程数
executor:任务执行器
threadFactory:线程工厂
selectorProvider:获取SelectableChannel和Selector的工具类
selectStrategyFactory:SelectStrategy工厂,控制selector.select方法的行为策略,在eventLoop的事件循环中会使用到。
chooserFactory:选择工厂(创建选择算法,该算法用于从group中选择出eventLoop来分配给channel)
rejectedExecutionHandler:拒绝执行处理器,当线程池无法执行新提交的任务时的处理机制
在很多情况下,程序中创建一个group对象,都是使用不带参数的构造方法,如new NioEventLoopGroup(),那么netty就会使用默认的机制提供上述的几个参数并对其相关成员进行初始化,
设置线程数
线程数设置方法如下:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
若没有设置eventLoopThreads的属性值,则会获取当前系统CPU核心数的2倍。
设置executor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
默认的executor是对每个提交的任务,根据提供的线程工厂创建一个新线程来异步执行。
设置chooserFactory
chooserFactory用于提供选择算法,group使用该算法来选择eventLoop来分配给channel,处理其生命周期中发生的事件,netty默认提供2种,如下
//方式1
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
//方式2
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
selectorProvider
selector和selectableChannel的提供者。默认使用jdk的SelectorProvider.provider()。详情请参考nio之Selector
rejectedExecutionHandler
当线程池的任务队列已满且没有可用的线程来执行新提交的任务,此时设置线程池的拒绝策略,jdk一共提供了4种,分别为直接丢弃、抛异常、执行execute方法所在的线程来执行、替换队列的任务。netty默认选择的是抛异常的方式。
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
selectStrategyFactory
selectStrategyFactory负责产生selectStrategy,netty中提供一个默认的选择策略对象。选择策略用于控制select循环的行为,比如,当前有事件需要被立即处理,那么阻塞的select操作可以被延迟或者跳过。
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
对基于nio的实现来说,只有2种情况,一种是若当前EventLoop有待处理的任务,那么先执行待处理的任务,若没有则进行select。
初始化
当了解了EventLoopGroup的几个参数作用后,来看下其构造方法的初始化流程
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//在此提供一个默认的executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// 省略
} finally {
if (!success) {
// 此处省略..
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
- 首先会初始化一个executor对象,然后根据线程数创建EventExecutor数组。
- 对EventExecutor数组的每个元素赋值,这里会生成NioEventLoop,并将group的相关初始化参数传给NioEventLoop。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
初始化NioEventLoop时,有个地方要注意下this.executor=ThreadExecutorMap.apply(executor, this)
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
//将eventExecutor(这里为NioEventLoop)与当前的Thread关联起来,那么当runnable的执行过程中,便可通过currentThread获取到其关联的NioEventLoop(底层使用ThreadLocal对象来实现)
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
//当runnable执行完了,再将currentThread和NioEventLoop解除关联
setCurrentEventExecutor(null);
}
}
};
}
- 根据生成的EventExecutor的个数来决定EventExecutorChooser。
- 给每个EventExecutor的terminalFuture对象(Promise)添加一个监听器,当所有的EventExecutor被shutdown后,当前group对象的terminalFuture的状态被设置为成功。
- 将生成的EventExecutor数组赋值给group对象。
网友评论