系列
Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题
开篇
- 这篇文章的目的主要是为了梳理下EventLoop和NioEventLoopGroup的类关系,慎重阅读,有点枯燥。
- 顺带解释了下Channel+NioEventLoopGroup+EventLoop的注册流程。
NioEventLoop
NioEventLoop- NioEventLoop的继承关系主线如上图所示,NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor --> AbstractScheduledEventExecutor --> AbstractEventExecutor --> AbstractExecutorService,其中AbstractEventExecutor作为连接JDK原生Executor和Netty的Executor的桥梁。
AbstractExecutorService
AbstractExecutorService- Executor接口只有一个execute()方法。
- ExecutorService接口新增了判断是否shutdown相关的方法、提交任务的submit方法、方法调用的invoke相关的方法。
- AbstractExecutorService新增了newTaskFor的相关方法,并重载实现了submit的相关方法。
- AbstractExecutorService作为JDK的Executor对象(如ThreadPoolExecutor)的父类,比较值得研究下。
AbstractEventExecutor
AbstractEventExecutor- AbstractEventExecutor作为Netty的Executor的基类,起着承上启下的作用。
- AbstractEventExecutor的核心包括submit和schedule其实都是重写了父类的AbstractExecutorService的方法,不过schedule内部没有任何实现。
AbstractScheduledEventExecutor
AbstractScheduledEventExecutor- AbstractScheduledEventExecutor类的核心方法都和schedule()有关,也是真正实现schedule功能的相关的方法。
SingleThreadEventExecutor
SingleThreadEventExecutor- SingleThreadEventExecutor包含任务队列Queue<Runnable> taskQueue,这个任务队列是保存所有任务的核心数据结构,是一个LinkedBlockingQueue数据结构的对象。
- SingleThreadEventExecutor的execute()方法负责提交Runnable对象到对应的taskQueue对象中。
SingleThreadEventLoop
image.png- SingleThreadEventLoop#register把eventLoop对象绑定对应的channel当中。
NioEventLoop
NioEventLoop- NioEventLoop#run方法用于处理该NioEventLoop的任务,是任务执行的核心调度入口。
- NioEventLoop#processSelectedKeys是由NioEventLoop#run内部触发的,负责处理事件。
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
- NioEventLoop的初始化过程,主要关注核心的几个变量包括selectorProvider、selector、selectStrategy、taskQueue。
- selector:这个就是jdk中nio的多路复用器Selector对象,熟悉nio类的人应该很清楚了,每个NioEventLoop会绑定一个selector对象。调用它的select()或者selectNow方法来读取io事件。
- selectStrategy:选择策略SelectStrategy对象。它提供了一种控制select循环行为的能力,要么是select方法阻塞线程,要么是继续循环两种行为。
- provider:SelectorProvider类的对象,它是Selector的工厂类,调用它的openSelector方法即可打开一个新的selector对象。
- ioRatio:表示处理io事件时间占比,默认值是50,即各占一半时间。
NioEventLoopGroup
NioEventLoopGroupEventExecutorGroup
EventExecutorGroupAbstractEventExecutorGroup
AbstractEventExecutorGroupMultithreadEventExecutorGroup
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// NioEvengLoop的选择器chooser
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
public EventExecutor next() {
return chooser.next();
}
}
MultithreadEventExecutorGroup
MultithreadEventLoopGroup
MultithreadEventLoopGrouppublic abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
public EventExecutor next() {
return chooser.next();
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
- MultithreadEventLoopGroup的register()方法负责选中其中一种NioEventLoop注册channel。
EventLoop注册channel流程
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// 省略代码
}
// channle绑定eventLoop对象
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
public EventExecutor next() {
return chooser.next();
}
}
- config().group().register(channel)实现了NioEventLoopGroup通过chooser.next()选择NioEventLoop绑定channel。
网友评论