还是看一下下面的例子,从代码里面,并没有看到,channel是如何注册的。除了一个connect方法。这里就是如何注册到对应的selector,以及如何连接到服务端的。在讲connect方法前,有必要先讲一下NioEventLoopGroup
public class TimeClient {
public static void main(String args[]) {
connect();
}
private static void connect() {
//用于客户端处通道的读写
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
.handler(new TimeClientHandler());
ChannelFuture cf = null;
try {
//一直阻塞,直到连接上服务端
cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
//一直阻塞,直到该通道关闭
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//避免线程没有杀死
work.shutdownGracefully();
}
}
}
NioEventLoopGroup的结构图
为了说的简单一些。所以简单的画了一个图
NioEventGroup结构图
接下来根据层次来说说各个接口的功能。
1.EventExecutorGroup。
概况一下:继承了线程池的接口,说明实现类必然是具备线程池的相关功能。
在线程池功能的基础上,新增了几个方法
1.boolean isShuttingDown();
2.Future<?> shutdownGracefully();
3.Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
4.FutFuture<?> terminationFuture();
5.next()
以上方法都是用于管理EventExecutor。
比如需要关闭EventExecutorGroup中所有的EventExecutor。
获取EventExecutorGroup的关闭状态。
2.AbstractEventExecutorGroup
概括一下: 是一个抽象类,但是覆写了线程池功能的相关方法。
主要修改的地方:举个例子,如下
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
执行一个任务,都是选择合适的EventExecutor去执行。
3.MultithreadEventExecutorGroup
概括一下: 是一个抽象类,主要的功能还挺多的。包
括EventExecutor选择的功能的实现
以及EventExecutor选择器的设置
还有EventExecutor的怎么生成,都是在这个类中完成的。
后面会重点讲一下。
4.MultithreadEventLoopGroup
其实,我也没搞懂,这个loop何为loop。
只知道这个接口还实现了EventLoopGroup这个接口(该接口具备了注册channel的功能)
覆写了next方法,返回为EventLoop而不是EventExecutor,EventLoop是EventExecutor的子类。
5.EventExecutor
概括一下:EventExecutor为EventExecutorGroup的子类
新增了一些方法。
1.parent(),返回所在的工作组
2.inEventLoop,判断线程池中的线程,是否当前线程。若是则说明已经开启。
6.EventLoopGroup
概括一下,从接口结构来看
它是EventExecutorGroup的子类,主要是新增了注册通道的功能。
说白了就具备了管理EventLoop的功能。
以及注册通道的功能,以及EventExecutor的功能,是一个执行者。
7.EventLoop
没什么特别的。不过还是得具体看实现。NioEventLoop
bootStrap的注册过程
先看看bootStrap的connect方法的走向,最后定位到BootStrap类的以下方法。从这里可以看到,最后是channel的eventLoop去执行注册。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
这里由于是NioSocketChannel,所以很简单的,直接看看NioSocketChannel的eventLoop方法的返回类型。最后发现,NioSocketChannel本身并没有实现这个方法。最后定位到父类。返回的类型是NioEventLoop。在讲解NioEventLoop前,要讲一下NioEventLoopGroup的初始化,这里面涉及到NioEventLoop的初始化。
NioEventLoopGroup的初始化
其实就是通过NioEventLoopGroup的构造方法一点一点debug进去。最后定位到MultithreadEventLoopGroup的构造方法。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
//静态代码块,用于初始化默认的EventLoop线程数,其实就是EventLoop的数量。
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//一开始传入是0,所以用的是默认的线程数
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
再接着往下走。最后来到这个方法。具体内容看注释。
这个方法主要涉及到,事件执行选择器的初始化(EventExecutorChooser用于选择合适的EventChooser),以及事件执行器的初始化(EventExecutor)
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//线程数小于0,直接报错
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//一开始executor 就是Null,所以必然会走到这里。
//executor的执行方法,就是每execute的时候,新创建一个线程去执行任务。具体可以看ThreadPerTaskExecutor的代码。
//当然netty不会那么傻,每次都new一个去执行任务。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//成员变量,用于存储所有的EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
所以这里是重点,怎么去创建EventExecutor的呢?直接往下面走,看后面的代码片段
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//......这里就是有任意一个没有创建成功,就会把所有的EventExecutor都关闭,这里肯定是将EventExecutor对应的线程中断。
}
}
//初始化EventExecutor选择器,这里面其实是根据游标来获取对应的EventExecutor,在next()方法中,其实是使用选择器去选择事件执行器。
chooser = chooserFactory.newChooser(children);
//。。。有一些无关代码,所以暂时先省略
}
EventExecutor是怎么生成的?
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
从入参看,需要传入对应的线程池,以及SelectProvider,选择策略与拒绝处理器。
再接着往下走
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}
直接看看NioEventLoop的构造方法
public final class NioEventLoop extends SingleThreadEventLoop {
设置最大的任务等待数量
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
...还有一些不重要的,直接忽略了。
}
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//成员变量,用于封装相关的线程池的配置。
private final int maxPendingTasks;
private final Executor executor;
private final RejectedExecutionHandler rejectedExecutionHandler;
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
//设置最大的任务等待数量
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//设置线程池
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//初始化任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
//设置拒绝策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
}
从上面的代码段可以看出。
1.NioEventLoop的父类是一个单线程的事件执行器(SingleThreadEventExecutor )。
2.初始化的过程中,需要设置对应的线程池,拒绝策略,以及任务队列。
所以说到底,EventLoop其实就是EventExecutor的一个增强,应该是叫具体实现。EventLoop在具体执行任务的时候,必然是使用线程池中的线程去执行,加队列的话则使用的是自定义的任务队列。拒绝策略也是使用自定义的拒绝策略。
说完EventLoop是怎么生成的,接下来就可以看看,EventLoop究竟是怎么工作的。
NioEventLoop是怎么工作的?
这里就得回到最初的那段代码,代码位于BootStrap中.
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
这段代码就是执行的核心了。
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
所以我们需要看的就是NioEventLoop的execute方法。
最后发现NioEventLoop并没有覆写这个方法,而是在其父类SingleThreadEventExecutor中进行了覆写。
下面来看看代码
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
用于存储当前处理器的处理线程(其实就是将线程池里面的线程保存在里面)。可以用于判断线程是否已经生成了(一开始thread这个变量必然为空,若与执行的线程不一样)。
private volatile Thread thread;
@Override
public void execute(Runnable task) {
任务为空直接抛异常
if (task == null) {
throw new NullPointerException("task");
}
判断当前线程是否成员变量一致。一开始必然不一致。
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
//将任务直接加到队列中
addTask(task);
} else {
启动线程----下面重点就是说一下这个线程。是如何去执行任务的,因为在整个方面里面,并没有看到整个线程是如何去执行任务的。只看到将任务添加到队列。
startThread();
//将任务直接加入到队列中
addTask(task);
//如果正在暂停 或者 在移除任务则拒绝
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
}
在上面的方法中,并没有看到任务的执行,只是简单的看到任务的添加以及拒绝策略。
所以接下来直接进入那个方法。
private void startThread() {
//如果没有开始
if (state == ST_NOT_STARTED) {
//设为开始 双重锁
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
//线程中断状态
private volatile boolean interrupted;
private void doStartThread() {
assert thread == null;
executor创建一个线程,用于执行以下方法。
executor.execute(new Runnable() {
@Override
public void run() {
将线程池中的任务线程保存到成员变量中,用于后续判断线程是否被用于EventLoop
thread = Thread.currentThread();
//中断状态,则线程中断
if (interrupted) {
thread.interrupt();
}
boolean success = false;
//更新上次的执行时间
updateLastExecutionTime();
try {
运行SingleThreadEventExecutor的run()方法。这里是后面需要重点看的............................................................,只有当run方法结束,success才会变成true。但是run方法其实也是一个死循环,只有关闭的时候,才会停止。这里只指定的是实现类的run方法,就是NioEventLoop中的run方法。
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//死循环,如果是关闭状态或者处于正在关闭状态,则跳出死循环
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
。。。。。。下面都是一些无关的代码,关于如何停止该执行器的
}
});
}
从doStartThread方法中可以看到,线程如果是中断状态,则直接中断。否则会运行run方法,只有run方法,结束了,才会去判断执行器的状态是否关闭或者关闭中,再跳出这个循环。那么这个run方法指的就是NioEventLoop的run方法。下面来看看是如何处理的。
NioEventLoop是如何处理任务的。
@Override
protected void run() {
单线程循环处理任务
for (;;) {
try {
//这里其实对selector的处理,通过准备好的key的数量来决定要不要唤醒selector
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//selector处理准备好的key
processSelectedKeys();
} finally {
处理所有任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//selector处理准备好的key
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
处理所有任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
从上面代码也可以很清晰的看得到,其实可以理解成一个死循环,处理所有准备好的Key以及任务。
我们接下来看看runAllTask的方法,由于实现都差不多,所以选一个就好了。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
执行所有的任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
没有任务跳出循环
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
到这里,任务的执行就完成了。
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
从上面可以简单的看出,NioEventLoop其实本质上上一个具备了执行注册功能的单线程线程池。注册本质也是一个任务,交由给EventLoop去操作。至于为什么是loop, 因为每个eventEventLoop只会有一个线程,通过线程不断的loop去处理任务队列,netty本身不会创建很多个线程。
举个例子,每个channel的注册,都是通过eventExecutorChooser实现的next方法,去找到合适的EventLoop去执行register方法,随后执行connect。每个eventLoop可以重复利用,做许多事情,通过不断loop去处理任务队列。
下面来看看怎么注册到Selector吧。
入口还是老地方BootStrap的connect方法io.netty.bootstrap.Bootstrap.connect(java.lang.String, int)
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
初始化,以及注册---------------这里是需要重点去看的。直接看内部实现。下一个代码片段。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//注册回调完成
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//注册完必然就是连接了
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 监听器,暂时没搞懂这里做什么的,直接跳过。这里看上去是,没有完成注册,后续需要完成的操作,不是需要关注的重点。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
//注册完必然就是连接了
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
}
通道是如何初始化以及注册的
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//通道的生成。这里逻辑比较简单,其实就是通过工厂去生成channel
channel = channelFactory.newChannel();
//初始化,通道的处理器的初始化。。。这里比较深,暂时没看,跳过
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
这里就是重点了。。。。。。。。。。。。。。。。很明显,通过EventLoopGroup去注册通道。看接下来的代码片段。register方法是在MultithreadEventLoopGroup中进行了实现。何为Loop,next方法就体现了。
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}
NioEventLoopGroup是如何去注册通道的
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
直接往里面走,感兴趣的可以看看next的实现,先看看register
return next().register(channel);
}
}
这个类实现了next方法,通过EventExecutorChooser去选择合适的EventExecutor
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
@Override
public EventExecutor next() {
return chooser.next();
}
}
注册方法的实现
因为NioEventLoop是SingleThreadEventLoop的子类。register跑到这个类里面去了
NioEventLoop的结构图
下面看看代码片段
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
...一层一层debug,最后定位到的是netty自己包装的channel的一个内部类的register方法
promise.channel().unsafe().register(this, promise);
return promise;
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
给通道设置对应的eventLoop
AbstractChannel.this.eventLoop = eventLoop;
当前线程与eventLoop中的线程不是同一个线程,一开始eventLoop并没有分配线程。所以走得是下面的else分支
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
通过eventLoop去执行。execute的方法在SingleThreadEventExecutor中,所以直接往下走。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
}
}
SingleThreadEventExecutor如何去注册
这里就涉及到我们一开始说的,EventLoop的工作流程。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//一开始,当然是没有在eventLoop中
if (inEventLoop) {
addTask(task);
} else {
所以这里会开启一个死循环的线程,处理selectorkey以及任务队列中的任务
startThread();
将注册任务添加到队列中
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
}
最后说一下注册的执行
注册方法的执行
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
注册的核心逻辑。点进去一看就是java提供的nio通道本身的注册实现。。。。。。。。。。。。
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
注册实现
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
注册到selector,同时将selectionKey设置到成员变量中。而selector则是eventLoop自带的selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
}
至此,通道的注册就完成了。
网友评论