1、NioEventLoopGroup
-
NioEventLoopGroup属于一个死循环的线程组,是一个执行流程的入口进行分析
-
当NioEventLoopGroup(1)的构造方法1表示指定一个线程来执行
-
NioEventLoopGroup事件循环中的发生,事件的连接建立,而且NioExcecutorGroup会注册到Channel
NioEventLoop、EventLoopGroup、NioEventLoopGroup关系图
- NioEventLoop
- EventLoopGroup
- NioEventLoopGroup
2、 ChannelFuture与ChannelPromise
- ChannelFuture将Channel注册到EventLoop中
- ChannelPromise继承ChannelFuture,里面是包含了一个Channel的方法的引用(难点)
ChannelFuture与ChannelPromise关系图
- ChannelFuture
- ChannelPromise
3、SelectorProvider
- SelectorProvider创建Selector选择器
4、 static关键字
- 用于修饰方法或变量静态使用
- static静态代码块在Class类被加载JVM之前会初始化,不用等待实例化才被使用
5、EventExecutor
-
EventExecutor继承EventExecutorGroup类,提供一些便捷的方法操作
EventExecutor
6、Netty中的bossGroup为什么使用线程池的原因大家众所纷纭
the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don’t see the reason for it.
意思就是说:netty作者说:我们在不同的服务器引导之间共享NioEventLoopGroup,多个boss线程是有用的,但我没有看到它的
7、NioEventLoopGroup源码剖析笔录
- NioEventLoopGroup构造方法
//创建一个实例,使用默认线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
public NioEventLoopGroup() {
this(0);
}
- 指定线程数
// 创建一个实例,指定线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
8、MultithreadEventLoopGroup源码剖析笔录
- 处理CPU核心数
//默认事件循环线程
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//根据可用进程的CPU进行相乘
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
//启动debug就打印log信息
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
- MultithreadEventExecutorGroup的构造器:DefaultEventExecutorChooserFactory.INSTANCE 默认事件执行选择器工程实例
/**
* 创建一个实例
* @param nThreads 该实例将使用的线程数
* @param executor 将要使用的executor, 默认为null
* @param args 参数将传递给每个newChild(Executor, Object…)调用
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
/**
* 最终的创建实例构造器
*
* @param nThreads 该实例将使用的线程数
* @param executor 将要使用的executor, 默认为null
* @param chooserFactory 将要使用的EventExecutorChooserFactory
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
/** 1.初始化线程池 */
//参数校验nThread合法性,
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口
// 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
//NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;
//数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
children = new EventExecutor[nThreads];
//for循环实例化children数组,NioEventLoop对象
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//newChild(executor, args) 函数在NioEventLoopGroup类中实现了,
// 实质就是就是存入了一个 NIOEventLoop类实例
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果构造失败, 就清理资源
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}//end foreach
/** 2.实例化线程工厂执行器选择器: 根据children获取选择器 */
chooser = chooserFactory.newChooser(children);
/** 3.为每个EventLoop线程添加 线程终止监听器*/
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
/** 4. 将children 添加到对应的set集合中去重, 表示只可读。*/
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
9、ServerBootstrap类的bind绑定端口的源码详解
- bind绑定端口的方法详解
//1、从bind点击进去就跟踪到ChannelFuture 的bind方法
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080)).sync();
//2、从bind方法的doBind看到处理SocketAddress
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
//3、 doBind方法里面有initAndRegister初始化和注册方法可以跟踪到
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册ChannelFuture
final ChannelFuture regFuture = initAndRegister();
// 从ChannelFuture 获取通道
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 此时,我们知道注册已经完成并成功。
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//但只是以防万一,注册的期望(未来)几乎总是已经完成了。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//这里是用到监听器进行检测ChannelFuture (管道期望完成状态结果)
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// EventLoop上的注册失败,因此通道承诺失败,直接导致
//一旦我们试图访问通道的EventLoop,就会出现IllegalStateException。
promise.setFailure(cause);
} else {
// 注册成功,因此设置要使用的正确执行程序。
// See https://github.com/netty/netty/issues/2586
promise.registered();//调用promise已经注册
//处理通道,ChannelFuture,SocketAddress,ChannelPromise
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
//4、从第3步doBind方法里面的initAndRegister跟踪到
final ChannelFuture initAndRegister() {
ChannelFuture regFuture = config().group().register(channel);
}
// `此config()方法是Bootstrap类的config() ,而config() .group()是返回 AbstractBootstrapConfig#EventLoopGroup的对象`
//从 config().group().register(channel);此时跟踪到EventLoopGroup这个类
public final BootstrapConfig config() {
return config;
}
- 从 config().group().register(channel);此时跟踪到EventLoopGroup这个类,从 EventLoopGroup看出ChannelFuture register(Channel channel);
- EventLoopGroup
EmbeddedEventLoop
MultithreadEventLoopGroup
SingleThreadEventLoop
ThreadPerChannelEventLoopGroup
- 再跟踪到EmbeddedEventLoop类,的register(Channel channel) 这个方法,然后再跟踪到register
//跟到注册
@Override
public ChannelFuture register(Channel channel) {
//此返回的register是下面的代码的 ChannelFuture register(ChannelPromise promise)
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
*此行代码 promise.channel().unsafe().register(this, promise);跟踪到如图
- unsafe
-
AbstractChannel -->AbstractUnsafe#register 截图
AbstractUnsafe的register 方法 -
AbstractChannel -->AbstractUnsafe#register代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
//判断事件循环是否在AbstractEventExecutor事件执行器里面
if (eventLoop.inEventLoop()) {
//进行注册ChannelPromise(管道期望)
register0(promise);
} else {
try {
//第一次调用事件循环执行eventLoop.execute()
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
网友评论